2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00
asynq/processor.go

424 lines
11 KiB
Go
Raw Normal View History

2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
2019-11-19 22:48:54 +08:00
package asynq
import (
"context"
2019-11-19 22:48:54 +08:00
"fmt"
"math/rand"
2020-01-12 23:46:51 +08:00
"sort"
"sync"
2019-11-19 22:48:54 +08:00
"time"
2019-12-04 13:01:26 +08:00
2019-12-22 23:15:45 +08:00
"github.com/hibiken/asynq/internal/base"
2020-05-06 13:10:11 +08:00
"github.com/hibiken/asynq/internal/log"
2019-12-04 13:01:26 +08:00
"github.com/hibiken/asynq/internal/rdb"
2020-01-22 22:28:47 +08:00
"golang.org/x/time/rate"
2019-11-19 22:48:54 +08:00
)
2019-11-21 12:08:03 +08:00
type processor struct {
2020-05-06 13:10:11 +08:00
logger *log.Logger
2020-04-18 22:55:10 +08:00
broker base.Broker
2019-11-19 22:48:54 +08:00
handler Handler
2019-11-19 22:48:54 +08:00
queueConfig map[string]int
2020-01-06 23:15:59 +08:00
2020-01-12 23:46:51 +08:00
// orderedQueues is set only in strict-priority mode.
orderedQueues []string
retryDelayFunc retryDelayFunc
errHandler ErrorHandler
shutdownTimeout time.Duration
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
2020-01-22 22:28:47 +08:00
// rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter
2019-11-19 22:48:54 +08:00
// sema is a counting semaphore to ensure the number of active workers
2019-11-30 04:48:54 +08:00
// does not exceed the limit.
2019-11-19 22:48:54 +08:00
sema chan struct{}
2019-11-21 12:08:03 +08:00
// channel to communicate back to the long running "processor" goroutine.
2019-12-18 12:34:56 +08:00
// once is used to send value to the channel only once.
2019-11-19 22:48:54 +08:00
done chan struct{}
2019-12-18 12:34:56 +08:00
once sync.Once
2020-06-19 21:21:25 +08:00
// quit channel is closed when the shutdown of the "processor" goroutine starts.
quit chan struct{}
2020-06-19 21:21:25 +08:00
// abort channel communicates to the in-flight worker goroutines to stop.
abort chan struct{}
// cancelations is a set of cancel functions for all in-progress tasks.
cancelations *base.Cancelations
2020-05-19 11:47:35 +08:00
starting chan<- *base.TaskMessage
finished chan<- *base.TaskMessage
2019-11-19 22:48:54 +08:00
}
type retryDelayFunc func(n int, err error, task *Task) time.Duration
type processorParams struct {
2020-05-06 13:10:11 +08:00
logger *log.Logger
2020-04-18 22:55:10 +08:00
broker base.Broker
retryDelayFunc retryDelayFunc
syncCh chan<- *syncRequest
cancelations *base.Cancelations
2020-05-19 11:47:35 +08:00
concurrency int
queues map[string]int
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
2020-05-19 11:47:35 +08:00
starting chan<- *base.TaskMessage
finished chan<- *base.TaskMessage
}
2020-01-12 23:46:51 +08:00
// newProcessor constructs a new processor.
func newProcessor(params processorParams) *processor {
2020-05-19 11:47:35 +08:00
queues := normalizeQueues(params.queues)
2020-01-12 23:46:51 +08:00
orderedQueues := []string(nil)
2020-05-19 11:47:35 +08:00
if params.strictPriority {
orderedQueues = sortByPriority(queues)
2020-01-12 23:46:51 +08:00
}
2019-11-21 12:08:03 +08:00
return &processor{
logger: params.logger,
2020-04-17 21:56:44 +08:00
broker: params.broker,
2020-05-19 11:47:35 +08:00
queueConfig: queues,
2020-01-12 23:46:51 +08:00
orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc,
syncRequestCh: params.syncCh,
cancelations: params.cancelations,
2020-01-22 22:28:47 +08:00
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
2020-05-19 11:47:35 +08:00
sema: make(chan struct{}, params.concurrency),
2019-11-30 04:48:54 +08:00
done: make(chan struct{}),
quit: make(chan struct{}),
2020-06-19 21:21:25 +08:00
abort: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
2020-05-19 11:47:35 +08:00
starting: params.starting,
finished: params.finished,
2019-11-19 22:48:54 +08:00
}
}
// Note: stops only the "processor" goroutine, does not stop workers.
// It's safe to call this method multiple times.
func (p *processor) stop() {
2019-12-18 12:34:56 +08:00
p.once.Do(func() {
p.logger.Debug("Processor shutting down...")
2019-12-18 12:34:56 +08:00
// Unblock if processor is waiting for sema token.
2020-06-19 21:21:25 +08:00
close(p.quit)
2019-12-18 12:34:56 +08:00
// Signal the processor goroutine to stop processing tasks
// from the queue.
p.done <- struct{}{}
})
}
2019-11-30 04:48:54 +08:00
// NOTE: once terminated, processor cannot be re-started.
2019-11-21 12:08:03 +08:00
func (p *processor) terminate() {
p.stop()
2020-06-19 21:21:25 +08:00
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
2020-06-19 21:21:25 +08:00
p.logger.Info("Waiting for all workers to finish...")
2019-11-28 11:36:56 +08:00
// block until all workers have released the token
2019-11-21 12:08:03 +08:00
for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{}
}
2020-03-09 22:11:16 +08:00
p.logger.Info("All workers have finished")
2019-11-19 22:48:54 +08:00
}
2020-02-16 15:14:30 +08:00
func (p *processor) start(wg *sync.WaitGroup) {
wg.Add(1)
2019-11-19 22:48:54 +08:00
go func() {
2020-02-16 15:14:30 +08:00
defer wg.Done()
2019-11-19 22:48:54 +08:00
for {
select {
2019-11-21 12:08:03 +08:00
case <-p.done:
p.logger.Debug("Processor done")
return
2019-11-19 22:48:54 +08:00
default:
2019-11-21 12:08:03 +08:00
p.exec()
2019-11-19 22:48:54 +08:00
}
}
}()
}
2019-11-22 13:45:27 +08:00
// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
2019-11-21 12:08:03 +08:00
func (p *processor) exec() {
select {
2020-06-19 21:21:25 +08:00
case <-p.quit:
return
case p.sema <- struct{}{}: // acquire token
qnames := p.queues()
msg, deadline, err := p.broker.Dequeue(qnames...)
switch {
case err == rdb.ErrNoProcessableTask:
p.logger.Debug("All queues are empty")
// Queues are empty, this is a normal behavior.
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: We are not using blocking pop operation and polling queues instead.
// This adds significant load to redis.
time.Sleep(time.Second)
<-p.sema // release token
return
case err != nil:
if p.errLogLimiter.Allow() {
p.logger.Errorf("Dequeue error: %v", err)
}
<-p.sema // release token
return
}
2020-05-19 11:47:35 +08:00
p.starting <- msg
go func() {
2020-02-02 14:22:48 +08:00
defer func() {
2020-05-19 11:47:35 +08:00
p.finished <- msg
2020-05-13 12:30:51 +08:00
<-p.sema // release token
2020-02-02 14:22:48 +08:00
}()
ctx, cancel := createContext(msg, deadline)
p.cancelations.Add(msg.ID.String(), cancel)
defer func() {
cancel()
p.cancelations.Delete(msg.ID.String())
}()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1)
go func() {
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
}()
select {
2020-06-19 21:21:25 +08:00
case <-p.abort:
2020-06-13 21:09:54 +08:00
// time is up, push the message back to queue and quit this worker goroutine.
2020-05-06 13:10:11 +08:00
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
2020-06-13 21:09:54 +08:00
p.requeue(msg)
return
case <-ctx.Done():
p.retryOrKill(ctx, msg, ctx.Err())
return
case resErr := <-resCh:
// Note: One of three things should happen.
// 1) Done -> Removes the message from InProgress
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
p.retryOrKill(ctx, msg, resErr)
return
}
p.markAsDone(ctx, msg)
2019-11-22 13:45:27 +08:00
}
}()
}
2019-11-19 22:48:54 +08:00
}
2019-12-22 23:15:45 +08:00
func (p *processor) requeue(msg *base.TaskMessage) {
2020-04-17 21:56:44 +08:00
err := p.broker.Requeue(msg)
if err != nil {
2020-05-06 13:10:11 +08:00
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
2020-06-13 21:09:54 +08:00
} else {
p.logger.Infof("Pushed task id=%s back to queue", msg.ID)
}
}
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
2020-04-17 21:56:44 +08:00
err := p.broker.Done(msg)
if err != nil {
2020-08-18 21:01:38 +08:00
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressKey(msg.Queue), err)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
2020-05-06 13:10:11 +08:00
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
2020-04-17 21:56:44 +08:00
return p.broker.Done(msg)
},
errMsg: errMsg,
deadline: deadline,
}
}
}
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if msg.Retried >= msg.Retry {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err)
} else {
p.retry(ctx, msg, err)
}
}
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d)
2020-04-17 21:56:44 +08:00
err := p.broker.Retry(msg, retryAt, e.Error())
if err != nil {
2020-08-18 21:01:38 +08:00
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.RetryKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
2020-05-06 13:10:11 +08:00
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
2020-04-17 21:56:44 +08:00
return p.broker.Retry(msg, retryAt, e.Error())
},
errMsg: errMsg,
deadline: deadline,
}
}
}
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
2020-04-17 21:56:44 +08:00
err := p.broker.Kill(msg, e.Error())
if err != nil {
2020-08-18 21:01:38 +08:00
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.DeadKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
2020-05-06 13:10:11 +08:00
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
2020-04-17 21:56:44 +08:00
return p.broker.Kill(msg, e.Error())
},
errMsg: errMsg,
deadline: deadline,
}
}
}
2019-11-28 06:03:04 +08:00
2020-01-12 23:46:51 +08:00
// queues returns a list of queues to query.
// Order of the queue names is based on the priority of each queue.
// Queue names is sorted by their priority level if strict-priority is true.
// If strict-priority is false, then the order of queue names are roughly based on
// the priority level but randomized in order to avoid starving low priority queues.
func (p *processor) queues() []string {
// skip the overhead of generating a list of queue names
// if we are processing one queue.
if len(p.queueConfig) == 1 {
for qname := range p.queueConfig {
return []string{qname}
}
}
2020-01-12 23:46:51 +08:00
if p.orderedQueues != nil {
return p.orderedQueues
}
var names []string
for qname, priority := range p.queueConfig {
2020-05-22 16:09:04 +08:00
for i := 0; i < priority; i++ {
names = append(names, qname)
}
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
return uniq(names, len(p.queueConfig))
}
2019-11-28 06:03:04 +08:00
// perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error.
func perform(ctx context.Context, task *Task, h Handler) (err error) {
2019-11-28 06:03:04 +08:00
defer func() {
if x := recover(); x != nil {
err = fmt.Errorf("panic: %v", x)
}
}()
return h.ProcessTask(ctx, task)
2019-11-28 06:03:04 +08:00
}
// uniq dedupes elements and returns a slice of unique names of length l.
// Order of the output slice is based on the input list.
func uniq(names []string, l int) []string {
var res []string
seen := make(map[string]struct{})
for _, s := range names {
if _, ok := seen[s]; !ok {
seen[s] = struct{}{}
res = append(res, s)
}
if len(res) == l {
break
}
}
return res
}
2020-01-12 23:46:51 +08:00
2020-01-13 22:50:03 +08:00
// sortByPriority returns a list of queue names sorted by
2020-01-12 23:46:51 +08:00
// their priority level in descending order.
func sortByPriority(qcfg map[string]int) []string {
2020-01-12 23:46:51 +08:00
var queues []*queue
for qname, n := range qcfg {
queues = append(queues, &queue{qname, n})
}
sort.Sort(sort.Reverse(byPriority(queues)))
var res []string
for _, q := range queues {
res = append(res, q.name)
}
return res
}
type queue struct {
name string
priority int
2020-01-12 23:46:51 +08:00
}
type byPriority []*queue
func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
// normalizeQueues divides priority numbers by their greatest common divisor.
func normalizeQueues(queues map[string]int) map[string]int {
var xs []int
2020-05-19 11:47:35 +08:00
for _, x := range queues {
2020-02-02 14:22:48 +08:00
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]int)
2020-05-19 11:47:35 +08:00
for q, x := range queues {
2020-02-02 14:22:48 +08:00
res[q] = x / d
}
return res
}
func gcd(xs ...int) int {
fn := func(x, y int) int {
2020-02-02 14:22:48 +08:00
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}