Update recoverer to use ListLeaseExpired

This commit is contained in:
Ken Hibino
2022-02-11 06:18:27 -08:00
parent bc2f1986d7
commit dabcb120d5
5 changed files with 62 additions and 58 deletions

View File

@@ -5,11 +5,11 @@
package asynq
import (
"context"
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log"
)
@@ -76,19 +76,23 @@ func (r *recoverer) start(wg *sync.WaitGroup) {
}()
}
// ErrLeaseExpired error indicates that the task failed because the worker working on the task
// could not extend its lease due to missing heartbeats. The worker may have crashed or got cutoff from the network.
var ErrLeaseExpired = errors.New("asynq: task lease expired")
func (r *recoverer) recover() {
// Get all tasks which have expired 30 seconds ago or earlier.
deadline := time.Now().Add(-30 * time.Second)
msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...)
// Get all tasks which have expired 30 seconds ago or earlier to accomodate certain amount of clock skew.
cutoff := time.Now().Add(-30 * time.Second)
msgs, err := r.broker.ListLeaseExpired(cutoff, r.queues...)
if err != nil {
r.logger.Warn("recoverer: could not list deadline exceeded tasks")
r.logger.Warn("recoverer: could not list lease expired tasks")
return
}
for _, msg := range msgs {
if msg.Retried >= msg.Retry {
r.archive(msg, context.DeadlineExceeded)
r.archive(msg, ErrLeaseExpired)
} else {
r.retry(msg, context.DeadlineExceeded)
r.retry(msg, ErrLeaseExpired)
}
}
}
@@ -97,7 +101,7 @@ func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(delay)
if err := r.broker.Retry(msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry deadline exceeded task: %v", err)
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)
}
}