diff --git a/CHANGELOG.md b/CHANGELOG.md index b45e93a..aec915e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `SkipRetry` error is added to be used as a return value from `Handler`. - `Servers` method is added to `Inspector` - `CancelActiveTask` method is added to `Inspector`. - `ListSchedulerEnqueueEvents` method is added to `Inspector`. diff --git a/processor.go b/processor.go index 477b012..8c57466 100644 --- a/processor.go +++ b/processor.go @@ -6,6 +6,7 @@ package asynq import ( "context" + "errors" "fmt" "math/rand" "runtime" @@ -261,11 +262,15 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { } } +// SkipRetry is used as a return value from Handler.ProcessTask to indicate that +// the task should not be retried and should be archived instead. +var SkipRetry = errors.New("skip retry for the task") + func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { if p.errHandler != nil { p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) } - if msg.Retried >= msg.Retry { + if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.kill(ctx, msg, err) } else { diff --git a/processor_test.go b/processor_test.go index 24b403f..f496971 100644 --- a/processor_test.go +++ b/processor_test.go @@ -307,9 +307,11 @@ func TestProcessorRetry(t *testing.T) { m4 := h.NewTaskMessage("sync", nil) errMsg := "something went wrong" + wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry) now := time.Now() tests := []struct { + desc string // test description pending []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run delay time.Duration // retry delay duration @@ -320,6 +322,7 @@ func TestProcessorRetry(t *testing.T) { wantErrCount int // number of times error handler should be called }{ { + desc: "Should automatically retry errored tasks", pending: []*base.TaskMessage{m1, m2}, incoming: []*base.TaskMessage{m3, m4}, delay: time.Minute, @@ -335,6 +338,38 @@ func TestProcessorRetry(t *testing.T) { wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, wantErrCount: 4, }, + { + desc: "Should skip retry errored tasks", + pending: []*base.TaskMessage{m1, m2}, + incoming: []*base.TaskMessage{}, + delay: time.Minute, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + return SkipRetry // return SkipRetry without wrapping + }), + wait: 2 * time.Second, + wantRetry: []base.Z{}, + wantDead: []*base.TaskMessage{ + h.TaskMessageWithError(*m1, SkipRetry.Error()), + h.TaskMessageWithError(*m2, SkipRetry.Error()), + }, + wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error + }, + { + desc: "Should skip retry errored tasks (with error wrapping)", + pending: []*base.TaskMessage{m1, m2}, + incoming: []*base.TaskMessage{}, + delay: time.Minute, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + return wrappedSkipRetry + }), + wait: 2 * time.Second, + wantRetry: []base.Z{}, + wantDead: []*base.TaskMessage{ + h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()), + h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()), + }, + wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error + }, } for _, tc := range tests { @@ -389,16 +424,16 @@ func TestProcessorRetry(t *testing.T) { cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff) + t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff) } gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName) if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff) + t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.DeadKey(base.DefaultQueueName), diff) } if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) + t.Errorf("%s: %q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), tc.desc, l) } if n != tc.wantErrCount { diff --git a/server.go b/server.go index 418c54a..261031b 100644 --- a/server.go +++ b/server.go @@ -392,6 +392,9 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { // // If ProcessTask return a non-nil error or panics, the task // will be retried after delay. +// One exception to this rule is when ProcessTask returns SkipRetry error. +// If the returned error is SkipRetry or the error wraps SkipRetry, retry is +// skipped and task will be archived instead. type Handler interface { ProcessTask(context.Context, *Task) error }