mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-13 04:46:39 +08:00
Call all context cancelFunc in processor (#926)
This commit is contained in:
parent
0655c569f5
commit
3f4e211a3b
15
processor.go
15
processor.go
@ -262,7 +262,8 @@ func (p *processor) requeue(l *base.Lease, msg *base.TaskMessage) {
|
||||
// If lease is not valid, do not write to redis; Let recoverer take care of it.
|
||||
return
|
||||
}
|
||||
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
err := p.broker.Requeue(ctx, msg)
|
||||
if err != nil {
|
||||
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
|
||||
@ -284,7 +285,8 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
|
||||
// If lease is not valid, do not write to redis; Let recoverer take care of it.
|
||||
return
|
||||
}
|
||||
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
err := p.broker.MarkAsComplete(ctx, msg)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v",
|
||||
@ -305,7 +307,8 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
|
||||
// If lease is not valid, do not write to redis; Let recoverer take care of it.
|
||||
return
|
||||
}
|
||||
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
err := p.broker.Done(ctx, msg)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err)
|
||||
@ -349,7 +352,8 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
|
||||
// If lease is not valid, do not write to redis; Let recoverer take care of it.
|
||||
return
|
||||
}
|
||||
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
|
||||
retryAt := time.Now().Add(d)
|
||||
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
|
||||
@ -371,7 +375,8 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
|
||||
// If lease is not valid, do not write to redis; Let recoverer take care of it.
|
||||
return
|
||||
}
|
||||
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
|
||||
ctx, cancel := context.WithDeadline(context.Background(), l.Deadline())
|
||||
defer cancel()
|
||||
err := p.broker.Archive(ctx, msg, e.Error())
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue))
|
||||
|
Loading…
Reference in New Issue
Block a user