2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Add IsFailure to Config

With this IsFailure config, users can provide a predicate function to 
determine whether the error returned from Handler counts as a failure.
This commit is contained in:
Ken Hibino
2021-09-01 06:00:54 -07:00
committed by GitHub
parent 3ae0e7f528
commit f0db219f6a
11 changed files with 246 additions and 35 deletions

View File

@@ -33,6 +33,7 @@ type processor struct {
orderedQueues []string
retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool
errHandler ErrorHandler
@@ -70,6 +71,7 @@ type processorParams struct {
logger *log.Logger
broker base.Broker
retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool
syncCh chan<- *syncRequest
cancelations *base.Cancelations
concurrency int
@@ -94,6 +96,7 @@ func newProcessor(params processorParams) *processor {
queueConfig: queues,
orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc,
isFailureFunc: params.isFailureFunc,
syncRequestCh: params.syncCh,
cancelations: params.cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
@@ -197,7 +200,7 @@ func (p *processor) exec() {
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
p.retryOrArchive(ctx, msg, ctx.Err())
return
default:
}
@@ -214,7 +217,7 @@ func (p *processor) exec() {
p.requeue(msg)
return
case <-ctx.Done():
p.retryOrKill(ctx, msg, ctx.Err())
p.retryOrArchive(ctx, msg, ctx.Err())
return
case resErr := <-resCh:
// Note: One of three things should happen.
@@ -222,7 +225,7 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from Active & Adds the message to Retry
// 3) Archive -> Removes the message from Active & Adds the message to archive
if resErr != nil {
p.retryOrKill(ctx, msg, resErr)
p.retryOrArchive(ctx, msg, resErr)
return
}
p.markAsDone(ctx, msg)
@@ -263,22 +266,27 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
// the task should not be retried and should be archived instead.
var SkipRetry = errors.New("skip retry for the task")
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
func (p *processor) retryOrArchive(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if !p.isFailureFunc(err) {
// retry the task without marking it as failed
p.retry(ctx, msg, err, false /*isFailure*/)
return
}
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.archive(ctx, msg, err)
} else {
p.retry(ctx, msg, err)
p.retry(ctx, msg, err, true /*isFailure*/)
}
}
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error, isFailure bool) {
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d)
err := p.broker.Retry(msg, retryAt, e.Error())
err := p.broker.Retry(msg, retryAt, e.Error(), isFailure)
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue))
deadline, ok := ctx.Deadline()
@@ -288,7 +296,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Retry(msg, retryAt, e.Error())
return p.broker.Retry(msg, retryAt, e.Error(), isFailure)
},
errMsg: errMsg,
deadline: deadline,