2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Allow user to specify retry delay duration

This commit is contained in:
Ken Hibino 2019-12-29 17:43:19 -08:00
parent 9af14d9a6d
commit 22b21df884
3 changed files with 64 additions and 32 deletions

View File

@ -3,6 +3,8 @@ package asynq
import (
"fmt"
"log"
"math"
"math/rand"
"os"
"os/signal"
"sync"
@ -43,7 +45,21 @@ type Config struct {
// TODO(hibiken): Add ShutdownTimeout
// ShutdownTimeout time.Duration
// TODO(hibiken): Add RetryDelayFunc
// Function to calculate retry delay for a failed task.
//
// By default, it uses exponential backoff algorithm to calculate the delay.
//
// n is the number of times the task has been retried.
// e is the error returned by the task handler.
// t is the task in question. t is read-only, the function should not mutate t.
RetryDelayFunc func(n int, e error, t *Task) time.Duration
}
// Formula taken from https://github.com/mperham/sidekiq.
func defaultDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1))
return time.Duration(s) * time.Second
}
// NewBackground returns a new Background instance given a redis client
@ -53,9 +69,13 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
if n < 1 {
n = 1
}
delayFunc := cfg.RetryDelayFunc
if delayFunc == nil {
delayFunc = defaultDelayFunc
}
rdb := rdb.NewRDB(r)
scheduler := newScheduler(rdb, 5*time.Second)
processor := newProcessor(rdb, n, nil)
processor := newProcessor(rdb, n, delayFunc)
return &Background{
rdb: rdb,
scheduler: scheduler,
@ -70,6 +90,9 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
//
// Note: The argument task is ready only, ProcessTask should
// not mutate the task.
type Handler interface {
ProcessTask(*Task) error
}

View File

@ -3,8 +3,6 @@ package asynq
import (
"fmt"
"log"
"math"
"math/rand"
"sync"
"time"
@ -17,6 +15,8 @@ type processor struct {
handler Handler
retryDelayFunc retryDelayFunc
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
// in case of a program shutdown or additon of a new queue.
@ -38,15 +38,18 @@ type processor struct {
quit chan struct{}
}
func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor {
return &processor{
rdb: r,
handler: handler,
retryDelayFunc: fn,
dequeueTimeout: 2 * time.Second,
sema: make(chan struct{}, numWorkers),
sema: make(chan struct{}, n),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
handler: HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }),
}
}
@ -136,9 +139,9 @@ func (p *processor) exec() {
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if msg.Retried >= msg.Retry {
p.kill(msg, resErr.Error())
p.kill(msg, resErr)
} else {
p.retry(msg, resErr.Error())
p.retry(msg, resErr)
}
return
}
@ -174,17 +177,18 @@ func (p *processor) markAsDone(msg *base.TaskMessage) {
}
}
func (p *processor) retry(msg *base.TaskMessage, errMsg string) {
retryAt := time.Now().Add(delaySeconds(msg.Retried))
err := p.rdb.Retry(msg, retryAt, errMsg)
func (p *processor) retry(msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, &Task{Type: msg.Type, Payload: msg.Payload})
retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err)
}
}
func (p *processor) kill(msg *base.TaskMessage, errMsg string) {
func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
err := p.rdb.Kill(msg, errMsg)
err := p.rdb.Kill(msg, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err)
}
@ -201,11 +205,3 @@ func perform(h Handler, task *Task) (err error) {
}()
return h.ProcessTask(task)
}
// delaySeconds returns a number seconds to delay before retrying.
// Formula taken from https://github.com/mperham/sidekiq.
func delaySeconds(count int) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1))
return time.Duration(s) * time.Second
}

View File

@ -59,7 +59,8 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task)
return nil
}
p := newProcessor(rdbClient, 10, HandlerFunc(handler))
p := newProcessor(rdbClient, 10, defaultDelayFunc)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
@ -107,19 +108,27 @@ func TestProcessorRetry(t *testing.T) {
r4.ErrorMsg = errMsg
r4.Retried = m4.Retried + 1
now := time.Now()
tests := []struct {
enqueued []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run
delay time.Duration // retry delay duration
wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []*base.TaskMessage // tasks in retry queue at the end
wantRetry []h.ZSetEntry // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end
}{
{
enqueued: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4},
wait: time.Second,
wantRetry: []*base.TaskMessage{&r2, &r3, &r4},
wantDead: []*base.TaskMessage{&r1},
enqueued: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4},
delay: time.Minute,
wait: time.Second,
wantRetry: []h.ZSetEntry{
{Msg: &r2, Score: now.Add(time.Minute).Unix()},
{Msg: &r3, Score: now.Add(time.Minute).Unix()},
{Msg: &r4, Score: now.Add(time.Minute).Unix()},
},
wantDead: []*base.TaskMessage{&r1},
},
}
@ -128,10 +137,14 @@ func TestProcessorRetry(t *testing.T) {
h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue.
// instantiate a new processor
delayFunc := func(n int, e error, t *Task) time.Duration {
return tc.delay
}
handler := func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(rdbClient, 10, HandlerFunc(handler))
p := newProcessor(rdbClient, 10, delayFunc)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
@ -145,8 +158,8 @@ func TestProcessorRetry(t *testing.T) {
time.Sleep(tc.wait)
p.terminate()
gotRetry := h.GetRetryMessages(t, r)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" {
gotRetry := h.GetRetryEntries(t, r)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff)
}