2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Do not start worker goroutine for task already exceeded its deadline

This commit is contained in:
Ken Hibino 2020-07-05 16:55:33 -07:00
parent 007fac8055
commit 7382e2aeb8
2 changed files with 17 additions and 10 deletions

View File

@ -34,6 +34,7 @@ A system can consist of multiple worker servers and brokers, giving way to high
- Scheduling of tasks
- Durability since tasks are written to Redis
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
- Low latency to add a task since writes are fast in Redis

View File

@ -191,9 +191,19 @@ func (p *processor) exec() {
p.cancelations.Delete(msg.ID.String())
}()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload)
go func() { resCh <- perform(ctx, task, p.handler) }()
go func() {
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
}()
select {
case <-p.abort:
@ -202,10 +212,6 @@ func (p *processor) exec() {
p.requeue(msg)
return
case <-ctx.Done():
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
if p.errHandler != nil {
p.errHandler.HandleError(ctx, task, ctx.Err())
}
p.retryOrKill(ctx, msg, ctx.Err())
return
case resErr := <-resCh:
@ -214,9 +220,6 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, task, resErr)
}
p.retryOrKill(ctx, msg, resErr)
return
}
@ -255,7 +258,11 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
}
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if msg.Retried >= msg.Retry {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err)
} else {
p.retry(ctx, msg, err)
@ -284,7 +291,6 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
}
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)