diff --git a/README.md b/README.md index 5ad6620..6bdf822 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ A system can consist of multiple worker servers and brokers, giving way to high - Scheduling of tasks - Durability since tasks are written to Redis - [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks +- Automatic recovery of tasks in the event of a worker crash - [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues) - [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues) - Low latency to add a task since writes are fast in Redis diff --git a/processor.go b/processor.go index 6a97178..1614d41 100644 --- a/processor.go +++ b/processor.go @@ -191,9 +191,19 @@ func (p *processor) exec() { p.cancelations.Delete(msg.ID.String()) }() + // check context before starting a worker goroutine. + select { + case <-ctx.Done(): + // already canceled (e.g. deadline exceeded). + p.retryOrKill(ctx, msg, ctx.Err()) + return + default: + } + resCh := make(chan error, 1) - task := NewTask(msg.Type, msg.Payload) - go func() { resCh <- perform(ctx, task, p.handler) }() + go func() { + resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler) + }() select { case <-p.abort: @@ -202,10 +212,6 @@ func (p *processor) exec() { p.requeue(msg) return case <-ctx.Done(): - p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above - if p.errHandler != nil { - p.errHandler.HandleError(ctx, task, ctx.Err()) - } p.retryOrKill(ctx, msg, ctx.Err()) return case resErr := <-resCh: @@ -214,9 +220,6 @@ func (p *processor) exec() { // 2) Retry -> Removes the message from InProgress & Adds the message to Retry // 3) Kill -> Removes the message from InProgress & Adds the message to Dead if resErr != nil { - if p.errHandler != nil { - p.errHandler.HandleError(ctx, task, resErr) - } p.retryOrKill(ctx, msg, resErr) return } @@ -255,7 +258,11 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { } func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { + if p.errHandler != nil { + p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) + } if msg.Retried >= msg.Retry { + p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.kill(ctx, msg, err) } else { p.retry(ctx, msg, err) @@ -284,7 +291,6 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { } func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { - p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) err := p.broker.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)