diff --git a/processor.go b/processor.go index fa810d6..c3a853a 100644 --- a/processor.go +++ b/processor.go @@ -336,15 +336,18 @@ func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) } switch { - case errors.Is(err, RevokeTask): - p.logger.Warnf("revoke task id=%s", msg.ID) - p.markAsDone(l, msg) - case msg.Retried >= msg.Retry || errors.Is(err, SkipRetry): - p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) - p.archive(l, msg, err) - default: - p.retry(l, msg, err, p.isFailureFunc(err)) - } + case errors.Is(err, RevokeTask): + p.logger.Warnf("Revoke task id=%s", msg.ID) + p.markAsDone(l, msg) + case ctx.Err() == context.Canceled || errors.Is(err, SkipRetry): + p.logger.Warnf("Task canceled, skipping retry for id=%s", msg.ID) + p.archive(l, msg, err) + case msg.Retried >= msg.Retry: + p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) + p.archive(l, msg, err) + default: + p.retry(l, msg, err, p.isFailureFunc(err)) + } } func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailure bool) {