From ae478d5b2243d5892f98d4104e4f256e3fb59cb2 Mon Sep 17 00:00:00 2001 From: kanzihuang Date: Sat, 19 Oct 2024 15:06:12 +0800 Subject: [PATCH] feat: revoke the task to modify task parameters and enqueue new task with the same task id (#882) --- processor.go | 19 +++++++++++-------- processor_test.go | 27 +++++++++++++++++++++++++++ server.go | 4 ++++ 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/processor.go b/processor.go index 4c6471a..93a0b4d 100644 --- a/processor.go +++ b/processor.go @@ -323,20 +323,23 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) { // the task should not be retried and should be archived instead. var SkipRetry = errors.New("skip retry for the task") +// RevokeTask is used as a return value from Handler.ProcessTask to indicate that +// the task should not be retried or archived. +var RevokeTask = errors.New("revoke task") + func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) } - if !p.isFailureFunc(err) { - // retry the task without marking it as failed - p.retry(l, msg, err, false /*isFailure*/) - return - } - if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { + switch { + case errors.Is(err, RevokeTask): + p.logger.Warnf("revoke task id=%s", msg.ID) + p.markAsDone(l, msg) + case msg.Retried >= msg.Retry || errors.Is(err, SkipRetry): p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.archive(l, msg, err) - } else { - p.retry(l, msg, err, true /*isFailure*/) + default: + p.retry(l, msg, err, p.isFailureFunc(err)) } } diff --git a/processor_test.go b/processor_test.go index 9be4729..669ce43 100644 --- a/processor_test.go +++ b/processor_test.go @@ -295,6 +295,7 @@ func TestProcessorRetry(t *testing.T) { errMsg := "something went wrong" wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry) + wrappedRevokeTask := fmt.Errorf("%s:%w", errMsg, RevokeTask) tests := []struct { desc string // test description @@ -346,6 +347,32 @@ func TestProcessorRetry(t *testing.T) { wantArchived: []*base.TaskMessage{m1, m2}, wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error }, + { + desc: "Should revoke task", + pending: []*base.TaskMessage{m1, m2}, + delay: time.Minute, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + return RevokeTask // return RevokeTask without wrapping + }), + wait: 2 * time.Second, + wantErrMsg: RevokeTask.Error(), + wantRetry: []*base.TaskMessage{}, + wantArchived: []*base.TaskMessage{}, + wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error + }, + { + desc: "Should revoke task (with error wrapping)", + pending: []*base.TaskMessage{m1, m2}, + delay: time.Minute, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + return wrappedRevokeTask + }), + wait: 2 * time.Second, + wantErrMsg: wrappedRevokeTask.Error(), + wantRetry: []*base.TaskMessage{}, + wantArchived: []*base.TaskMessage{}, + wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error + }, } for _, tc := range tests { diff --git a/server.go b/server.go index e405399..277638e 100644 --- a/server.go +++ b/server.go @@ -633,6 +633,10 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { // One exception to this rule is when ProcessTask returns a SkipRetry error. // If the returned error is SkipRetry or an error wraps SkipRetry, retry is // skipped and the task will be immediately archived instead. +// +// One exception to this rule is when ProcessTask returns a RevokeTask error. +// If the returned error is RevokeTask or an error wraps RevokeTask, the task +// will not be retried or archived. type Handler interface { ProcessTask(context.Context, *Task) error }