From 22b21df884789d288ade2216126a14ab4364b7cc Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 29 Dec 2019 17:43:19 -0800 Subject: [PATCH] Allow user to specify retry delay duration --- background.go | 27 +++++++++++++++++++++++++-- processor.go | 36 ++++++++++++++++-------------------- processor_test.go | 33 +++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/background.go b/background.go index bfa4caa..fb860ce 100644 --- a/background.go +++ b/background.go @@ -3,6 +3,8 @@ package asynq import ( "fmt" "log" + "math" + "math/rand" "os" "os/signal" "sync" @@ -43,7 +45,21 @@ type Config struct { // TODO(hibiken): Add ShutdownTimeout // ShutdownTimeout time.Duration - // TODO(hibiken): Add RetryDelayFunc + // Function to calculate retry delay for a failed task. + // + // By default, it uses exponential backoff algorithm to calculate the delay. + // + // n is the number of times the task has been retried. + // e is the error returned by the task handler. + // t is the task in question. t is read-only, the function should not mutate t. + RetryDelayFunc func(n int, e error, t *Task) time.Duration +} + +// Formula taken from https://github.com/mperham/sidekiq. +func defaultDelayFunc(n int, e error, t *Task) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1)) + return time.Duration(s) * time.Second } // NewBackground returns a new Background instance given a redis client @@ -53,9 +69,13 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { if n < 1 { n = 1 } + delayFunc := cfg.RetryDelayFunc + if delayFunc == nil { + delayFunc = defaultDelayFunc + } rdb := rdb.NewRDB(r) scheduler := newScheduler(rdb, 5*time.Second) - processor := newProcessor(rdb, n, nil) + processor := newProcessor(rdb, n, delayFunc) return &Background{ rdb: rdb, scheduler: scheduler, @@ -70,6 +90,9 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { // // If ProcessTask return a non-nil error or panics, the task // will be retried after delay. +// +// Note: The argument task is ready only, ProcessTask should +// not mutate the task. type Handler interface { ProcessTask(*Task) error } diff --git a/processor.go b/processor.go index f2076b0..1aafc93 100644 --- a/processor.go +++ b/processor.go @@ -3,8 +3,6 @@ package asynq import ( "fmt" "log" - "math" - "math/rand" "sync" "time" @@ -17,6 +15,8 @@ type processor struct { handler Handler + retryDelayFunc retryDelayFunc + // timeout for blocking dequeue operation. // dequeue needs to timeout to avoid blocking forever // in case of a program shutdown or additon of a new queue. @@ -38,15 +38,18 @@ type processor struct { quit chan struct{} } -func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { +type retryDelayFunc func(n int, err error, task *Task) time.Duration + +func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor { return &processor{ rdb: r, - handler: handler, + retryDelayFunc: fn, dequeueTimeout: 2 * time.Second, - sema: make(chan struct{}, numWorkers), + sema: make(chan struct{}, n), done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), + handler: HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }), } } @@ -136,9 +139,9 @@ func (p *processor) exec() { // 3) Kill -> Removes the message from InProgress & Adds the message to Dead if resErr != nil { if msg.Retried >= msg.Retry { - p.kill(msg, resErr.Error()) + p.kill(msg, resErr) } else { - p.retry(msg, resErr.Error()) + p.retry(msg, resErr) } return } @@ -174,17 +177,18 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { } } -func (p *processor) retry(msg *base.TaskMessage, errMsg string) { - retryAt := time.Now().Add(delaySeconds(msg.Retried)) - err := p.rdb.Retry(msg, retryAt, errMsg) +func (p *processor) retry(msg *base.TaskMessage, e error) { + d := p.retryDelayFunc(msg.Retried, e, &Task{Type: msg.Type, Payload: msg.Payload}) + retryAt := time.Now().Add(d) + err := p.rdb.Retry(msg, retryAt, e.Error()) if err != nil { log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) } } -func (p *processor) kill(msg *base.TaskMessage, errMsg string) { +func (p *processor) kill(msg *base.TaskMessage, e error) { log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) - err := p.rdb.Kill(msg, errMsg) + err := p.rdb.Kill(msg, e.Error()) if err != nil { log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) } @@ -201,11 +205,3 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/processor_test.go b/processor_test.go index 03c9234..b23daae 100644 --- a/processor_test.go +++ b/processor_test.go @@ -59,7 +59,8 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, HandlerFunc(handler)) + p := newProcessor(rdbClient, 10, defaultDelayFunc) + p.handler = HandlerFunc(handler) p.dequeueTimeout = time.Second // short time out for test purpose p.start() @@ -107,19 +108,27 @@ func TestProcessorRetry(t *testing.T) { r4.ErrorMsg = errMsg r4.Retried = m4.Retried + 1 + now := time.Now() + tests := []struct { enqueued []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run + delay time.Duration // retry delay duration wait time.Duration // wait duration between starting and stopping processor for this test case - wantRetry []*base.TaskMessage // tasks in retry queue at the end + wantRetry []h.ZSetEntry // tasks in retry queue at the end wantDead []*base.TaskMessage // tasks in dead queue at the end }{ { - enqueued: []*base.TaskMessage{m1, m2}, - incoming: []*base.TaskMessage{m3, m4}, - wait: time.Second, - wantRetry: []*base.TaskMessage{&r2, &r3, &r4}, - wantDead: []*base.TaskMessage{&r1}, + enqueued: []*base.TaskMessage{m1, m2}, + incoming: []*base.TaskMessage{m3, m4}, + delay: time.Minute, + wait: time.Second, + wantRetry: []h.ZSetEntry{ + {Msg: &r2, Score: now.Add(time.Minute).Unix()}, + {Msg: &r3, Score: now.Add(time.Minute).Unix()}, + {Msg: &r4, Score: now.Add(time.Minute).Unix()}, + }, + wantDead: []*base.TaskMessage{&r1}, }, } @@ -128,10 +137,14 @@ func TestProcessorRetry(t *testing.T) { h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. // instantiate a new processor + delayFunc := func(n int, e error, t *Task) time.Duration { + return tc.delay + } handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, HandlerFunc(handler)) + p := newProcessor(rdbClient, 10, delayFunc) + p.handler = HandlerFunc(handler) p.dequeueTimeout = time.Second // short time out for test purpose p.start() @@ -145,8 +158,8 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotRetry := h.GetRetryMessages(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r) + if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) }