mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
feat: revoke the task to modify task parameters and enqueue new task with the same task id (#882)
This commit is contained in:
parent
ff7ef48463
commit
ae478d5b22
19
processor.go
19
processor.go
@ -323,20 +323,23 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
|
|||||||
// the task should not be retried and should be archived instead.
|
// the task should not be retried and should be archived instead.
|
||||||
var SkipRetry = errors.New("skip retry for the task")
|
var SkipRetry = errors.New("skip retry for the task")
|
||||||
|
|
||||||
|
// RevokeTask is used as a return value from Handler.ProcessTask to indicate that
|
||||||
|
// the task should not be retried or archived.
|
||||||
|
var RevokeTask = errors.New("revoke task")
|
||||||
|
|
||||||
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
|
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
|
||||||
if p.errHandler != nil {
|
if p.errHandler != nil {
|
||||||
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
|
||||||
}
|
}
|
||||||
if !p.isFailureFunc(err) {
|
switch {
|
||||||
// retry the task without marking it as failed
|
case errors.Is(err, RevokeTask):
|
||||||
p.retry(l, msg, err, false /*isFailure*/)
|
p.logger.Warnf("revoke task id=%s", msg.ID)
|
||||||
return
|
p.markAsDone(l, msg)
|
||||||
}
|
case msg.Retried >= msg.Retry || errors.Is(err, SkipRetry):
|
||||||
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
|
|
||||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||||
p.archive(l, msg, err)
|
p.archive(l, msg, err)
|
||||||
} else {
|
default:
|
||||||
p.retry(l, msg, err, true /*isFailure*/)
|
p.retry(l, msg, err, p.isFailureFunc(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,6 +295,7 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
|
|
||||||
errMsg := "something went wrong"
|
errMsg := "something went wrong"
|
||||||
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
|
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
|
||||||
|
wrappedRevokeTask := fmt.Errorf("%s:%w", errMsg, RevokeTask)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
desc string // test description
|
desc string // test description
|
||||||
@ -346,6 +347,32 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
wantArchived: []*base.TaskMessage{m1, m2},
|
wantArchived: []*base.TaskMessage{m1, m2},
|
||||||
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
|
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "Should revoke task",
|
||||||
|
pending: []*base.TaskMessage{m1, m2},
|
||||||
|
delay: time.Minute,
|
||||||
|
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||||
|
return RevokeTask // return RevokeTask without wrapping
|
||||||
|
}),
|
||||||
|
wait: 2 * time.Second,
|
||||||
|
wantErrMsg: RevokeTask.Error(),
|
||||||
|
wantRetry: []*base.TaskMessage{},
|
||||||
|
wantArchived: []*base.TaskMessage{},
|
||||||
|
wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Should revoke task (with error wrapping)",
|
||||||
|
pending: []*base.TaskMessage{m1, m2},
|
||||||
|
delay: time.Minute,
|
||||||
|
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||||
|
return wrappedRevokeTask
|
||||||
|
}),
|
||||||
|
wait: 2 * time.Second,
|
||||||
|
wantErrMsg: wrappedRevokeTask.Error(),
|
||||||
|
wantRetry: []*base.TaskMessage{},
|
||||||
|
wantArchived: []*base.TaskMessage{},
|
||||||
|
wantErrCount: 2, // ErrorHandler should still be called with RevokeTask error
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
@ -633,6 +633,10 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
|
|||||||
// One exception to this rule is when ProcessTask returns a SkipRetry error.
|
// One exception to this rule is when ProcessTask returns a SkipRetry error.
|
||||||
// If the returned error is SkipRetry or an error wraps SkipRetry, retry is
|
// If the returned error is SkipRetry or an error wraps SkipRetry, retry is
|
||||||
// skipped and the task will be immediately archived instead.
|
// skipped and the task will be immediately archived instead.
|
||||||
|
//
|
||||||
|
// One exception to this rule is when ProcessTask returns a RevokeTask error.
|
||||||
|
// If the returned error is RevokeTask or an error wraps RevokeTask, the task
|
||||||
|
// will not be retried or archived.
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
ProcessTask(context.Context, *Task) error
|
ProcessTask(context.Context, *Task) error
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user