mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-22 22:06:12 +08:00 
			
		
		
		
	Add SkipRetry error to be used as a return value from Handler
This commit is contained in:
		| @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | ||||
|  | ||||
| ### Added | ||||
|  | ||||
| - `SkipRetry` error is added to be used as a return value from `Handler`. | ||||
| - `Servers` method is added to `Inspector` | ||||
| - `CancelActiveTask` method is added to `Inspector`. | ||||
| - `ListSchedulerEnqueueEvents` method is added to `Inspector`. | ||||
|   | ||||
| @@ -6,6 +6,7 @@ package asynq | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"math/rand" | ||||
| 	"runtime" | ||||
| @@ -261,11 +262,15 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SkipRetry is used as a return value from Handler.ProcessTask to indicate that | ||||
| // the task should not be retried and should be archived instead. | ||||
| var SkipRetry = errors.New("skip retry for the task") | ||||
|  | ||||
| func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { | ||||
| 	if p.errHandler != nil { | ||||
| 		p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) | ||||
| 	} | ||||
| 	if msg.Retried >= msg.Retry { | ||||
| 	if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { | ||||
| 		p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) | ||||
| 		p.kill(ctx, msg, err) | ||||
| 	} else { | ||||
|   | ||||
| @@ -307,9 +307,11 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 	m4 := h.NewTaskMessage("sync", nil) | ||||
|  | ||||
| 	errMsg := "something went wrong" | ||||
| 	wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry) | ||||
| 	now := time.Now() | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		desc         string              // test description | ||||
| 		pending      []*base.TaskMessage // initial default queue state | ||||
| 		incoming     []*base.TaskMessage // tasks to be enqueued during run | ||||
| 		delay        time.Duration       // retry delay duration | ||||
| @@ -320,6 +322,7 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 		wantErrCount int                 // number of times error handler should be called | ||||
| 	}{ | ||||
| 		{ | ||||
| 			desc:     "Should automatically retry errored tasks", | ||||
| 			pending:  []*base.TaskMessage{m1, m2}, | ||||
| 			incoming: []*base.TaskMessage{m3, m4}, | ||||
| 			delay:    time.Minute, | ||||
| @@ -335,6 +338,38 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 			wantDead:     []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, | ||||
| 			wantErrCount: 4, | ||||
| 		}, | ||||
| 		{ | ||||
| 			desc:     "Should skip retry errored tasks", | ||||
| 			pending:  []*base.TaskMessage{m1, m2}, | ||||
| 			incoming: []*base.TaskMessage{}, | ||||
| 			delay:    time.Minute, | ||||
| 			handler: HandlerFunc(func(ctx context.Context, task *Task) error { | ||||
| 				return SkipRetry // return SkipRetry without wrapping | ||||
| 			}), | ||||
| 			wait:      2 * time.Second, | ||||
| 			wantRetry: []base.Z{}, | ||||
| 			wantDead: []*base.TaskMessage{ | ||||
| 				h.TaskMessageWithError(*m1, SkipRetry.Error()), | ||||
| 				h.TaskMessageWithError(*m2, SkipRetry.Error()), | ||||
| 			}, | ||||
| 			wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error | ||||
| 		}, | ||||
| 		{ | ||||
| 			desc:     "Should skip retry errored tasks (with error wrapping)", | ||||
| 			pending:  []*base.TaskMessage{m1, m2}, | ||||
| 			incoming: []*base.TaskMessage{}, | ||||
| 			delay:    time.Minute, | ||||
| 			handler: HandlerFunc(func(ctx context.Context, task *Task) error { | ||||
| 				return wrappedSkipRetry | ||||
| 			}), | ||||
| 			wait:      2 * time.Second, | ||||
| 			wantRetry: []base.Z{}, | ||||
| 			wantDead: []*base.TaskMessage{ | ||||
| 				h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()), | ||||
| 				h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()), | ||||
| 			}, | ||||
| 			wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| @@ -389,16 +424,16 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 		cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score | ||||
| 		gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) | ||||
| 		if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { | ||||
| 			t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff) | ||||
| 			t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff) | ||||
| 		} | ||||
|  | ||||
| 		gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName) | ||||
| 		if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { | ||||
| 			t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff) | ||||
| 			t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.DeadKey(base.DefaultQueueName), diff) | ||||
| 		} | ||||
|  | ||||
| 		if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { | ||||
| 			t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) | ||||
| 			t.Errorf("%s: %q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), tc.desc, l) | ||||
| 		} | ||||
|  | ||||
| 		if n != tc.wantErrCount { | ||||
|   | ||||
| @@ -392,6 +392,9 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { | ||||
| // | ||||
| // If ProcessTask return a non-nil error or panics, the task | ||||
| // will be retried after delay. | ||||
| // One exception to this rule is when ProcessTask returns SkipRetry error. | ||||
| // If the returned error is SkipRetry or the error wraps SkipRetry, retry is | ||||
| // skipped and task will be archived instead. | ||||
| type Handler interface { | ||||
| 	ProcessTask(context.Context, *Task) error | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user