2020-01-03 10:13:16 +08:00
|
|
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT license
|
|
|
|
// that can be found in the LICENSE file.
|
|
|
|
|
2019-11-19 22:48:54 +08:00
|
|
|
package asynq
|
|
|
|
|
|
|
|
import (
|
2020-02-11 23:06:52 +08:00
|
|
|
"context"
|
2019-11-19 22:48:54 +08:00
|
|
|
"fmt"
|
2020-01-07 23:03:39 +08:00
|
|
|
"math/rand"
|
2021-01-11 03:44:08 +08:00
|
|
|
"runtime"
|
2021-01-11 03:20:20 +08:00
|
|
|
"runtime/debug"
|
2020-01-12 23:46:51 +08:00
|
|
|
"sort"
|
2021-01-11 03:44:08 +08:00
|
|
|
"strings"
|
2019-12-17 22:18:22 +08:00
|
|
|
"sync"
|
2019-11-19 22:48:54 +08:00
|
|
|
"time"
|
2019-12-04 13:01:26 +08:00
|
|
|
|
2019-12-22 23:15:45 +08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2021-11-04 06:55:23 +08:00
|
|
|
asynqcontext "github.com/hibiken/asynq/internal/context"
|
2021-05-11 12:19:57 +08:00
|
|
|
"github.com/hibiken/asynq/internal/errors"
|
2020-05-06 13:10:11 +08:00
|
|
|
"github.com/hibiken/asynq/internal/log"
|
2020-01-22 22:28:47 +08:00
|
|
|
"golang.org/x/time/rate"
|
2019-11-19 22:48:54 +08:00
|
|
|
)
|
|
|
|
|
2019-11-21 12:08:03 +08:00
|
|
|
type processor struct {
|
2020-05-06 13:10:11 +08:00
|
|
|
logger *log.Logger
|
2020-04-18 22:55:10 +08:00
|
|
|
broker base.Broker
|
2019-11-19 22:48:54 +08:00
|
|
|
|
2022-01-28 23:51:34 +08:00
|
|
|
handler Handler
|
2022-02-10 17:56:34 +08:00
|
|
|
baseCtxFn func() context.Context
|
2019-11-19 22:48:54 +08:00
|
|
|
|
2020-02-13 14:23:25 +08:00
|
|
|
queueConfig map[string]int
|
2020-01-06 23:15:59 +08:00
|
|
|
|
2020-01-12 23:46:51 +08:00
|
|
|
// orderedQueues is set only in strict-priority mode.
|
|
|
|
orderedQueues []string
|
|
|
|
|
2021-01-13 03:40:26 +08:00
|
|
|
retryDelayFunc RetryDelayFunc
|
2021-09-01 21:00:54 +08:00
|
|
|
isFailureFunc func(error) bool
|
2019-12-30 09:43:19 +08:00
|
|
|
|
2020-03-01 08:27:59 +08:00
|
|
|
errHandler ErrorHandler
|
|
|
|
|
2020-04-15 22:15:01 +08:00
|
|
|
shutdownTimeout time.Duration
|
|
|
|
|
2020-01-18 23:32:06 +08:00
|
|
|
// channel via which to send sync requests to syncer.
|
|
|
|
syncRequestCh chan<- *syncRequest
|
|
|
|
|
2020-01-22 22:28:47 +08:00
|
|
|
// rate limiter to prevent spamming logs with a bunch of errors.
|
|
|
|
errLogLimiter *rate.Limiter
|
|
|
|
|
2019-11-19 22:48:54 +08:00
|
|
|
// sema is a counting semaphore to ensure the number of active workers
|
2019-11-30 04:48:54 +08:00
|
|
|
// does not exceed the limit.
|
2019-11-19 22:48:54 +08:00
|
|
|
sema chan struct{}
|
|
|
|
|
2019-11-21 12:08:03 +08:00
|
|
|
// channel to communicate back to the long running "processor" goroutine.
|
2019-12-18 12:34:56 +08:00
|
|
|
// once is used to send value to the channel only once.
|
2019-11-19 22:48:54 +08:00
|
|
|
done chan struct{}
|
2019-12-18 12:34:56 +08:00
|
|
|
once sync.Once
|
2019-12-18 12:07:17 +08:00
|
|
|
|
2020-06-19 21:21:25 +08:00
|
|
|
// quit channel is closed when the shutdown of the "processor" goroutine starts.
|
2019-12-16 13:00:09 +08:00
|
|
|
quit chan struct{}
|
2020-02-11 23:40:45 +08:00
|
|
|
|
2020-06-19 21:21:25 +08:00
|
|
|
// abort channel communicates to the in-flight worker goroutines to stop.
|
|
|
|
abort chan struct{}
|
|
|
|
|
2020-09-06 03:43:15 +08:00
|
|
|
// cancelations is a set of cancel functions for all active tasks.
|
2020-02-13 09:12:09 +08:00
|
|
|
cancelations *base.Cancelations
|
2020-05-19 11:47:35 +08:00
|
|
|
|
2021-01-28 07:55:43 +08:00
|
|
|
starting chan<- *workerInfo
|
2020-05-19 11:47:35 +08:00
|
|
|
finished chan<- *base.TaskMessage
|
2019-11-19 22:48:54 +08:00
|
|
|
}
|
|
|
|
|
2020-05-18 03:33:55 +08:00
|
|
|
type processorParams struct {
|
2020-05-06 13:10:11 +08:00
|
|
|
logger *log.Logger
|
2020-04-18 22:55:10 +08:00
|
|
|
broker base.Broker
|
2022-02-10 17:56:34 +08:00
|
|
|
baseCtxFn func() context.Context
|
2021-01-13 03:40:26 +08:00
|
|
|
retryDelayFunc RetryDelayFunc
|
2021-09-01 21:00:54 +08:00
|
|
|
isFailureFunc func(error) bool
|
2020-04-15 22:15:01 +08:00
|
|
|
syncCh chan<- *syncRequest
|
|
|
|
cancelations *base.Cancelations
|
2020-05-19 11:47:35 +08:00
|
|
|
concurrency int
|
|
|
|
queues map[string]int
|
|
|
|
strictPriority bool
|
2020-04-15 22:15:01 +08:00
|
|
|
errHandler ErrorHandler
|
|
|
|
shutdownTimeout time.Duration
|
2021-01-28 07:55:43 +08:00
|
|
|
starting chan<- *workerInfo
|
2020-05-19 11:47:35 +08:00
|
|
|
finished chan<- *base.TaskMessage
|
2020-04-15 22:15:01 +08:00
|
|
|
}
|
|
|
|
|
2020-01-12 23:46:51 +08:00
|
|
|
// newProcessor constructs a new processor.
|
2020-05-18 03:33:55 +08:00
|
|
|
func newProcessor(params processorParams) *processor {
|
2020-05-19 11:47:35 +08:00
|
|
|
queues := normalizeQueues(params.queues)
|
2020-01-12 23:46:51 +08:00
|
|
|
orderedQueues := []string(nil)
|
2020-05-19 11:47:35 +08:00
|
|
|
if params.strictPriority {
|
|
|
|
orderedQueues = sortByPriority(queues)
|
2020-01-12 23:46:51 +08:00
|
|
|
}
|
2019-11-21 12:08:03 +08:00
|
|
|
return &processor{
|
2020-11-23 03:50:57 +08:00
|
|
|
logger: params.logger,
|
|
|
|
broker: params.broker,
|
2022-01-28 23:51:34 +08:00
|
|
|
baseCtxFn: params.baseCtxFn,
|
2020-11-23 03:50:57 +08:00
|
|
|
queueConfig: queues,
|
|
|
|
orderedQueues: orderedQueues,
|
|
|
|
retryDelayFunc: params.retryDelayFunc,
|
2021-09-01 21:00:54 +08:00
|
|
|
isFailureFunc: params.isFailureFunc,
|
2020-11-23 03:50:57 +08:00
|
|
|
syncRequestCh: params.syncCh,
|
|
|
|
cancelations: params.cancelations,
|
|
|
|
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
|
|
|
sema: make(chan struct{}, params.concurrency),
|
|
|
|
done: make(chan struct{}),
|
|
|
|
quit: make(chan struct{}),
|
|
|
|
abort: make(chan struct{}),
|
|
|
|
errHandler: params.errHandler,
|
|
|
|
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
|
|
|
shutdownTimeout: params.shutdownTimeout,
|
|
|
|
starting: params.starting,
|
|
|
|
finished: params.finished,
|
2019-11-19 22:48:54 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-12-17 22:18:22 +08:00
|
|
|
// Note: stops only the "processor" goroutine, does not stop workers.
|
|
|
|
// It's safe to call this method multiple times.
|
|
|
|
func (p *processor) stop() {
|
2019-12-18 12:34:56 +08:00
|
|
|
p.once.Do(func() {
|
2020-05-11 22:02:26 +08:00
|
|
|
p.logger.Debug("Processor shutting down...")
|
2019-12-18 12:34:56 +08:00
|
|
|
// Unblock if processor is waiting for sema token.
|
2020-06-19 21:21:25 +08:00
|
|
|
close(p.quit)
|
2019-12-18 12:34:56 +08:00
|
|
|
// Signal the processor goroutine to stop processing tasks
|
|
|
|
// from the queue.
|
|
|
|
p.done <- struct{}{}
|
|
|
|
})
|
2019-12-17 22:18:22 +08:00
|
|
|
}
|
|
|
|
|
2021-03-23 21:20:54 +08:00
|
|
|
// NOTE: once shutdown, processor cannot be re-started.
|
|
|
|
func (p *processor) shutdown() {
|
2019-12-17 22:18:22 +08:00
|
|
|
p.stop()
|
2019-11-19 23:38:09 +08:00
|
|
|
|
2020-06-19 21:21:25 +08:00
|
|
|
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
|
2020-02-11 23:40:45 +08:00
|
|
|
|
2020-06-19 21:21:25 +08:00
|
|
|
p.logger.Info("Waiting for all workers to finish...")
|
2019-11-28 11:36:56 +08:00
|
|
|
// block until all workers have released the token
|
2019-11-21 12:08:03 +08:00
|
|
|
for i := 0; i < cap(p.sema); i++ {
|
|
|
|
p.sema <- struct{}{}
|
2019-11-19 23:38:09 +08:00
|
|
|
}
|
2020-03-09 22:11:16 +08:00
|
|
|
p.logger.Info("All workers have finished")
|
2019-11-19 22:48:54 +08:00
|
|
|
}
|
|
|
|
|
2020-02-16 15:14:30 +08:00
|
|
|
func (p *processor) start(wg *sync.WaitGroup) {
|
|
|
|
wg.Add(1)
|
2019-11-19 22:48:54 +08:00
|
|
|
go func() {
|
2020-02-16 15:14:30 +08:00
|
|
|
defer wg.Done()
|
2019-11-19 22:48:54 +08:00
|
|
|
for {
|
|
|
|
select {
|
2019-11-21 12:08:03 +08:00
|
|
|
case <-p.done:
|
2020-05-11 22:02:26 +08:00
|
|
|
p.logger.Debug("Processor done")
|
2019-11-19 23:38:09 +08:00
|
|
|
return
|
2019-11-19 22:48:54 +08:00
|
|
|
default:
|
2019-11-21 12:08:03 +08:00
|
|
|
p.exec()
|
2019-11-19 22:48:54 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
2019-11-22 13:45:27 +08:00
|
|
|
// exec pulls a task out of the queue and starts a worker goroutine to
|
|
|
|
// process the task.
|
2019-11-21 12:08:03 +08:00
|
|
|
func (p *processor) exec() {
|
2019-12-18 12:07:17 +08:00
|
|
|
select {
|
2020-06-19 21:21:25 +08:00
|
|
|
case <-p.quit:
|
2019-12-18 12:07:17 +08:00
|
|
|
return
|
|
|
|
case p.sema <- struct{}{}: // acquire token
|
2020-06-19 20:34:36 +08:00
|
|
|
qnames := p.queues()
|
|
|
|
msg, deadline, err := p.broker.Dequeue(qnames...)
|
|
|
|
switch {
|
2021-05-11 12:19:57 +08:00
|
|
|
case errors.Is(err, errors.ErrNoProcessableTask):
|
2020-06-19 20:34:36 +08:00
|
|
|
p.logger.Debug("All queues are empty")
|
|
|
|
// Queues are empty, this is a normal behavior.
|
|
|
|
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
|
|
|
|
// Note: We are not using blocking pop operation and polling queues instead.
|
|
|
|
// This adds significant load to redis.
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
<-p.sema // release token
|
|
|
|
return
|
|
|
|
case err != nil:
|
|
|
|
if p.errLogLimiter.Allow() {
|
|
|
|
p.logger.Errorf("Dequeue error: %v", err)
|
|
|
|
}
|
|
|
|
<-p.sema // release token
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-01-28 07:55:43 +08:00
|
|
|
p.starting <- &workerInfo{msg, time.Now(), deadline}
|
2019-12-16 13:00:09 +08:00
|
|
|
go func() {
|
2020-02-02 14:22:48 +08:00
|
|
|
defer func() {
|
2020-05-19 11:47:35 +08:00
|
|
|
p.finished <- msg
|
2020-05-13 12:30:51 +08:00
|
|
|
<-p.sema // release token
|
2020-02-02 14:22:48 +08:00
|
|
|
}()
|
2019-12-18 12:07:17 +08:00
|
|
|
|
2022-01-28 23:51:34 +08:00
|
|
|
ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
|
2021-09-10 21:29:37 +08:00
|
|
|
p.cancelations.Add(msg.ID, cancel)
|
2020-05-04 04:58:54 +08:00
|
|
|
defer func() {
|
|
|
|
cancel()
|
2021-09-10 21:29:37 +08:00
|
|
|
p.cancelations.Delete(msg.ID)
|
2019-12-18 12:07:17 +08:00
|
|
|
}()
|
2019-12-16 13:00:09 +08:00
|
|
|
|
2020-07-06 07:55:33 +08:00
|
|
|
// check context before starting a worker goroutine.
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
// already canceled (e.g. deadline exceeded).
|
2021-11-06 07:52:54 +08:00
|
|
|
p.handleFailedMessage(ctx, msg, ctx.Err())
|
2020-07-06 07:55:33 +08:00
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2020-05-04 04:58:54 +08:00
|
|
|
resCh := make(chan error, 1)
|
2020-07-06 07:55:33 +08:00
|
|
|
go func() {
|
2021-11-06 07:52:54 +08:00
|
|
|
task := newTask(
|
|
|
|
msg.Type,
|
|
|
|
msg.Payload,
|
|
|
|
&ResultWriter{
|
|
|
|
id: msg.ID,
|
|
|
|
qname: msg.Queue,
|
|
|
|
broker: p.broker,
|
|
|
|
ctx: ctx,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
resCh <- p.perform(ctx, task)
|
2020-07-06 07:55:33 +08:00
|
|
|
}()
|
2020-05-04 04:58:54 +08:00
|
|
|
|
2019-12-18 12:07:17 +08:00
|
|
|
select {
|
2020-06-19 21:21:25 +08:00
|
|
|
case <-p.abort:
|
2020-06-13 21:09:54 +08:00
|
|
|
// time is up, push the message back to queue and quit this worker goroutine.
|
2020-05-06 13:10:11 +08:00
|
|
|
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
|
2020-06-13 21:09:54 +08:00
|
|
|
p.requeue(msg)
|
2019-12-18 12:07:17 +08:00
|
|
|
return
|
2020-06-19 20:34:36 +08:00
|
|
|
case <-ctx.Done():
|
2021-11-06 07:52:54 +08:00
|
|
|
p.handleFailedMessage(ctx, msg, ctx.Err())
|
2020-06-19 20:34:36 +08:00
|
|
|
return
|
2019-12-18 12:07:17 +08:00
|
|
|
case resErr := <-resCh:
|
|
|
|
if resErr != nil {
|
2021-11-06 07:52:54 +08:00
|
|
|
p.handleFailedMessage(ctx, msg, resErr)
|
2019-12-16 13:00:09 +08:00
|
|
|
return
|
|
|
|
}
|
2021-11-06 07:52:54 +08:00
|
|
|
p.handleSucceededMessage(ctx, msg)
|
2019-11-22 13:45:27 +08:00
|
|
|
}
|
2019-12-18 12:07:17 +08:00
|
|
|
}()
|
|
|
|
}
|
2019-11-19 22:48:54 +08:00
|
|
|
}
|
2019-11-24 07:09:50 +08:00
|
|
|
|
2019-12-22 23:15:45 +08:00
|
|
|
func (p *processor) requeue(msg *base.TaskMessage) {
|
2020-04-17 21:56:44 +08:00
|
|
|
err := p.broker.Requeue(msg)
|
2019-12-18 12:07:17 +08:00
|
|
|
if err != nil {
|
2020-05-06 13:10:11 +08:00
|
|
|
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
|
2020-06-13 21:09:54 +08:00
|
|
|
} else {
|
|
|
|
p.logger.Infof("Pushed task id=%s back to queue", msg.ID)
|
2019-12-18 12:07:17 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-06 07:52:54 +08:00
|
|
|
func (p *processor) handleSucceededMessage(ctx context.Context, msg *base.TaskMessage) {
|
|
|
|
if msg.Retention > 0 {
|
|
|
|
p.markAsComplete(ctx, msg)
|
|
|
|
} else {
|
|
|
|
p.markAsDone(ctx, msg)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *processor) markAsComplete(ctx context.Context, msg *base.TaskMessage) {
|
|
|
|
err := p.broker.MarkAsComplete(msg)
|
|
|
|
if err != nil {
|
|
|
|
errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v",
|
|
|
|
msg.ID, msg.Type, base.ActiveKey(msg.Queue), base.CompletedKey(msg.Queue), err)
|
|
|
|
deadline, ok := ctx.Deadline()
|
|
|
|
if !ok {
|
|
|
|
panic("asynq: internal error: missing deadline in context")
|
|
|
|
}
|
|
|
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
|
|
|
p.syncRequestCh <- &syncRequest{
|
|
|
|
fn: func() error {
|
|
|
|
return p.broker.MarkAsComplete(msg)
|
|
|
|
},
|
|
|
|
errMsg: errMsg,
|
|
|
|
deadline: deadline,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-19 20:51:50 +08:00
|
|
|
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
2020-04-17 21:56:44 +08:00
|
|
|
err := p.broker.Done(msg)
|
2019-12-16 13:00:09 +08:00
|
|
|
if err != nil {
|
2020-09-06 03:43:15 +08:00
|
|
|
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err)
|
2020-06-19 20:51:50 +08:00
|
|
|
deadline, ok := ctx.Deadline()
|
|
|
|
if !ok {
|
|
|
|
panic("asynq: internal error: missing deadline in context")
|
|
|
|
}
|
2020-05-06 13:10:11 +08:00
|
|
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
2020-01-18 23:32:06 +08:00
|
|
|
p.syncRequestCh <- &syncRequest{
|
|
|
|
fn: func() error {
|
2020-04-17 21:56:44 +08:00
|
|
|
return p.broker.Done(msg)
|
2020-01-18 23:32:06 +08:00
|
|
|
},
|
2020-06-19 20:51:50 +08:00
|
|
|
errMsg: errMsg,
|
|
|
|
deadline: deadline,
|
2020-01-18 23:32:06 +08:00
|
|
|
}
|
2019-12-16 13:00:09 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-11 23:21:50 +08:00
|
|
|
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
|
|
|
|
// the task should not be retried and should be archived instead.
|
|
|
|
var SkipRetry = errors.New("skip retry for the task")
|
|
|
|
|
2021-11-06 07:52:54 +08:00
|
|
|
func (p *processor) handleFailedMessage(ctx context.Context, msg *base.TaskMessage, err error) {
|
2020-07-06 07:55:33 +08:00
|
|
|
if p.errHandler != nil {
|
|
|
|
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
|
|
|
}
|
2021-09-01 21:00:54 +08:00
|
|
|
if !p.isFailureFunc(err) {
|
|
|
|
// retry the task without marking it as failed
|
|
|
|
p.retry(ctx, msg, err, false /*isFailure*/)
|
|
|
|
return
|
|
|
|
}
|
2021-01-11 23:21:50 +08:00
|
|
|
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
|
2020-07-06 07:55:33 +08:00
|
|
|
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
2021-01-13 03:01:21 +08:00
|
|
|
p.archive(ctx, msg, err)
|
2020-06-19 20:34:36 +08:00
|
|
|
} else {
|
2021-09-01 21:00:54 +08:00
|
|
|
p.retry(ctx, msg, err, true /*isFailure*/)
|
2020-06-19 20:34:36 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-01 21:00:54 +08:00
|
|
|
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error, isFailure bool) {
|
2020-01-05 05:13:46 +08:00
|
|
|
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
|
2019-12-30 09:43:19 +08:00
|
|
|
retryAt := time.Now().Add(d)
|
2021-09-01 21:00:54 +08:00
|
|
|
err := p.broker.Retry(msg, retryAt, e.Error(), isFailure)
|
2019-12-16 13:00:09 +08:00
|
|
|
if err != nil {
|
2020-09-06 03:43:15 +08:00
|
|
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue))
|
2020-06-19 20:51:50 +08:00
|
|
|
deadline, ok := ctx.Deadline()
|
|
|
|
if !ok {
|
|
|
|
panic("asynq: internal error: missing deadline in context")
|
|
|
|
}
|
2020-05-06 13:10:11 +08:00
|
|
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
2020-01-18 23:32:06 +08:00
|
|
|
p.syncRequestCh <- &syncRequest{
|
|
|
|
fn: func() error {
|
2021-09-01 21:00:54 +08:00
|
|
|
return p.broker.Retry(msg, retryAt, e.Error(), isFailure)
|
2020-01-18 23:32:06 +08:00
|
|
|
},
|
2020-06-19 20:51:50 +08:00
|
|
|
errMsg: errMsg,
|
|
|
|
deadline: deadline,
|
2020-01-18 23:32:06 +08:00
|
|
|
}
|
2019-12-16 13:00:09 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-01-13 03:01:21 +08:00
|
|
|
func (p *processor) archive(ctx context.Context, msg *base.TaskMessage, e error) {
|
|
|
|
err := p.broker.Archive(msg, e.Error())
|
2019-12-16 13:00:09 +08:00
|
|
|
if err != nil {
|
2021-01-13 03:01:21 +08:00
|
|
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue))
|
2020-06-19 20:51:50 +08:00
|
|
|
deadline, ok := ctx.Deadline()
|
|
|
|
if !ok {
|
|
|
|
panic("asynq: internal error: missing deadline in context")
|
|
|
|
}
|
2020-05-06 13:10:11 +08:00
|
|
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
2020-01-18 23:32:06 +08:00
|
|
|
p.syncRequestCh <- &syncRequest{
|
|
|
|
fn: func() error {
|
2021-01-13 03:01:21 +08:00
|
|
|
return p.broker.Archive(msg, e.Error())
|
2020-01-18 23:32:06 +08:00
|
|
|
},
|
2020-06-19 20:51:50 +08:00
|
|
|
errMsg: errMsg,
|
|
|
|
deadline: deadline,
|
2020-01-18 23:32:06 +08:00
|
|
|
}
|
2019-11-24 07:09:50 +08:00
|
|
|
}
|
|
|
|
}
|
2019-11-28 06:03:04 +08:00
|
|
|
|
2020-01-12 23:46:51 +08:00
|
|
|
// queues returns a list of queues to query.
|
|
|
|
// Order of the queue names is based on the priority of each queue.
|
|
|
|
// Queue names is sorted by their priority level if strict-priority is true.
|
|
|
|
// If strict-priority is false, then the order of queue names are roughly based on
|
|
|
|
// the priority level but randomized in order to avoid starving low priority queues.
|
2020-01-07 23:03:39 +08:00
|
|
|
func (p *processor) queues() []string {
|
2020-01-14 22:29:05 +08:00
|
|
|
// skip the overhead of generating a list of queue names
|
|
|
|
// if we are processing one queue.
|
|
|
|
if len(p.queueConfig) == 1 {
|
|
|
|
for qname := range p.queueConfig {
|
|
|
|
return []string{qname}
|
|
|
|
}
|
|
|
|
}
|
2020-01-12 23:46:51 +08:00
|
|
|
if p.orderedQueues != nil {
|
|
|
|
return p.orderedQueues
|
|
|
|
}
|
2020-01-07 23:03:39 +08:00
|
|
|
var names []string
|
|
|
|
for qname, priority := range p.queueConfig {
|
2020-05-22 16:09:04 +08:00
|
|
|
for i := 0; i < priority; i++ {
|
2020-01-07 23:03:39 +08:00
|
|
|
names = append(names, qname)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
|
|
|
|
return uniq(names, len(p.queueConfig))
|
|
|
|
}
|
|
|
|
|
2019-11-28 06:03:04 +08:00
|
|
|
// perform calls the handler with the given task.
|
|
|
|
// If the call returns without panic, it simply returns the value,
|
|
|
|
// otherwise, it recovers from panic and returns an error.
|
2021-01-11 03:20:20 +08:00
|
|
|
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
2019-11-28 06:03:04 +08:00
|
|
|
defer func() {
|
|
|
|
if x := recover(); x != nil {
|
2021-01-11 03:20:20 +08:00
|
|
|
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
|
2021-01-11 03:44:08 +08:00
|
|
|
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
|
|
|
if ok && strings.Contains(file, "runtime/") {
|
|
|
|
// The panic came from the runtime, most likely due to incorrect
|
|
|
|
// map/slice usage. The parent frame should have the real trigger.
|
|
|
|
_, file, line, ok = runtime.Caller(2)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Include the file and line number info in the error, if runtime.Caller returned ok.
|
|
|
|
if ok {
|
|
|
|
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x)
|
|
|
|
} else {
|
|
|
|
err = fmt.Errorf("panic: %v", x)
|
|
|
|
}
|
2019-11-28 06:03:04 +08:00
|
|
|
}
|
|
|
|
}()
|
2021-01-11 03:20:20 +08:00
|
|
|
return p.handler.ProcessTask(ctx, task)
|
2019-11-28 06:03:04 +08:00
|
|
|
}
|
2020-01-07 23:03:39 +08:00
|
|
|
|
|
|
|
// uniq dedupes elements and returns a slice of unique names of length l.
|
|
|
|
// Order of the output slice is based on the input list.
|
|
|
|
func uniq(names []string, l int) []string {
|
|
|
|
var res []string
|
|
|
|
seen := make(map[string]struct{})
|
|
|
|
for _, s := range names {
|
|
|
|
if _, ok := seen[s]; !ok {
|
|
|
|
seen[s] = struct{}{}
|
|
|
|
res = append(res, s)
|
|
|
|
}
|
|
|
|
if len(res) == l {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
2020-01-12 23:46:51 +08:00
|
|
|
|
2020-01-13 22:50:03 +08:00
|
|
|
// sortByPriority returns a list of queue names sorted by
|
2020-01-12 23:46:51 +08:00
|
|
|
// their priority level in descending order.
|
2020-02-13 14:23:25 +08:00
|
|
|
func sortByPriority(qcfg map[string]int) []string {
|
2020-01-12 23:46:51 +08:00
|
|
|
var queues []*queue
|
|
|
|
for qname, n := range qcfg {
|
|
|
|
queues = append(queues, &queue{qname, n})
|
|
|
|
}
|
|
|
|
sort.Sort(sort.Reverse(byPriority(queues)))
|
|
|
|
var res []string
|
|
|
|
for _, q := range queues {
|
|
|
|
res = append(res, q.name)
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
type queue struct {
|
|
|
|
name string
|
2020-02-13 14:23:25 +08:00
|
|
|
priority int
|
2020-01-12 23:46:51 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
type byPriority []*queue
|
|
|
|
|
|
|
|
func (x byPriority) Len() int { return len(x) }
|
|
|
|
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
|
|
|
|
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
|
2020-02-02 14:22:48 +08:00
|
|
|
|
2020-05-19 11:47:35 +08:00
|
|
|
// normalizeQueues divides priority numbers by their greatest common divisor.
|
|
|
|
func normalizeQueues(queues map[string]int) map[string]int {
|
2020-02-13 14:23:25 +08:00
|
|
|
var xs []int
|
2020-05-19 11:47:35 +08:00
|
|
|
for _, x := range queues {
|
2020-02-02 14:22:48 +08:00
|
|
|
xs = append(xs, x)
|
|
|
|
}
|
|
|
|
d := gcd(xs...)
|
2020-02-13 14:23:25 +08:00
|
|
|
res := make(map[string]int)
|
2020-05-19 11:47:35 +08:00
|
|
|
for q, x := range queues {
|
2020-02-02 14:22:48 +08:00
|
|
|
res[q] = x / d
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
2020-02-13 14:23:25 +08:00
|
|
|
func gcd(xs ...int) int {
|
|
|
|
fn := func(x, y int) int {
|
2020-02-02 14:22:48 +08:00
|
|
|
for y > 0 {
|
|
|
|
x, y = y, x%y
|
|
|
|
}
|
|
|
|
return x
|
|
|
|
}
|
|
|
|
res := xs[0]
|
|
|
|
for i := 0; i < len(xs); i++ {
|
|
|
|
res = fn(xs[i], res)
|
|
|
|
if res == 1 {
|
|
|
|
return 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|