2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Add SkipRetry error to be used as a return value from Handler

This commit is contained in:
Ken Hibino 2021-01-11 07:21:50 -08:00
parent a150d18ed7
commit 7235041128
4 changed files with 48 additions and 4 deletions

View File

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- `SkipRetry` error is added to be used as a return value from `Handler`.
- `Servers` method is added to `Inspector` - `Servers` method is added to `Inspector`
- `CancelActiveTask` method is added to `Inspector`. - `CancelActiveTask` method is added to `Inspector`.
- `ListSchedulerEnqueueEvents` method is added to `Inspector`. - `ListSchedulerEnqueueEvents` method is added to `Inspector`.

View File

@ -6,6 +6,7 @@ package asynq
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"runtime" "runtime"
@ -261,11 +262,15 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
} }
} }
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
// the task should not be retried and should be archived instead.
var SkipRetry = errors.New("skip retry for the task")
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
} }
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err) p.kill(ctx, msg, err)
} else { } else {

View File

@ -307,9 +307,11 @@ func TestProcessorRetry(t *testing.T) {
m4 := h.NewTaskMessage("sync", nil) m4 := h.NewTaskMessage("sync", nil)
errMsg := "something went wrong" errMsg := "something went wrong"
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
desc string // test description
pending []*base.TaskMessage // initial default queue state pending []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run incoming []*base.TaskMessage // tasks to be enqueued during run
delay time.Duration // retry delay duration delay time.Duration // retry delay duration
@ -320,6 +322,7 @@ func TestProcessorRetry(t *testing.T) {
wantErrCount int // number of times error handler should be called wantErrCount int // number of times error handler should be called
}{ }{
{ {
desc: "Should automatically retry errored tasks",
pending: []*base.TaskMessage{m1, m2}, pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4}, incoming: []*base.TaskMessage{m3, m4},
delay: time.Minute, delay: time.Minute,
@ -335,6 +338,38 @@ func TestProcessorRetry(t *testing.T) {
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
wantErrCount: 4, wantErrCount: 4,
}, },
{
desc: "Should skip retry errored tasks",
pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return SkipRetry // return SkipRetry without wrapping
}),
wait: 2 * time.Second,
wantRetry: []base.Z{},
wantDead: []*base.TaskMessage{
h.TaskMessageWithError(*m1, SkipRetry.Error()),
h.TaskMessageWithError(*m2, SkipRetry.Error()),
},
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
},
{
desc: "Should skip retry errored tasks (with error wrapping)",
pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return wrappedSkipRetry
}),
wait: 2 * time.Second,
wantRetry: []base.Z{},
wantDead: []*base.TaskMessage{
h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()),
h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()),
},
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
},
} }
for _, tc := range tests { for _, tc := range tests {
@ -389,16 +424,16 @@ func TestProcessorRetry(t *testing.T) {
cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff) t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff)
} }
gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName) gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName)
if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff) t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.DeadKey(base.DefaultQueueName), diff)
} }
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) t.Errorf("%s: %q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), tc.desc, l)
} }
if n != tc.wantErrCount { if n != tc.wantErrCount {

View File

@ -392,6 +392,9 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
// //
// If ProcessTask return a non-nil error or panics, the task // If ProcessTask return a non-nil error or panics, the task
// will be retried after delay. // will be retried after delay.
// One exception to this rule is when ProcessTask returns SkipRetry error.
// If the returned error is SkipRetry or the error wraps SkipRetry, retry is
// skipped and task will be archived instead.
type Handler interface { type Handler interface {
ProcessTask(context.Context, *Task) error ProcessTask(context.Context, *Task) error
} }