mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Fix recoverer to run task recovering logic every minute
This commit is contained in:
parent
d5e9f3b1bd
commit
ff6768f9bb
36
recoverer.go
36
recoverer.go
@ -65,27 +65,31 @@ func (r *recoverer) start(wg *sync.WaitGroup) {
|
|||||||
timer.Stop()
|
timer.Stop()
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
// Get all tasks which have expired 30 seconds ago or earlier.
|
r.recover()
|
||||||
deadline := time.Now().Add(-30 * time.Second)
|
timer.Reset(r.interval)
|
||||||
msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...)
|
|
||||||
if err != nil {
|
|
||||||
r.logger.Warn("recoverer: could not list deadline exceeded tasks")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
const errMsg = "deadline exceeded" // TODO: better error message
|
|
||||||
for _, msg := range msgs {
|
|
||||||
if msg.Retried >= msg.Retry {
|
|
||||||
r.archive(msg, errMsg)
|
|
||||||
} else {
|
|
||||||
r.retry(msg, errMsg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *recoverer) recover() {
|
||||||
|
// Get all tasks which have expired 30 seconds ago or earlier.
|
||||||
|
deadline := time.Now().Add(-30 * time.Second)
|
||||||
|
msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...)
|
||||||
|
if err != nil {
|
||||||
|
r.logger.Warn("recoverer: could not list deadline exceeded tasks")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const errMsg = "deadline exceeded"
|
||||||
|
for _, msg := range msgs {
|
||||||
|
if msg.Retried >= msg.Retry {
|
||||||
|
r.archive(msg, errMsg)
|
||||||
|
} else {
|
||||||
|
r.retry(msg, errMsg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *recoverer) retry(msg *base.TaskMessage, errMsg string) {
|
func (r *recoverer) retry(msg *base.TaskMessage, errMsg string) {
|
||||||
delay := r.retryDelayFunc(msg.Retried, fmt.Errorf(errMsg), NewTask(msg.Type, msg.Payload))
|
delay := r.retryDelayFunc(msg.Retried, fmt.Errorf(errMsg), NewTask(msg.Type, msg.Payload))
|
||||||
retryAt := time.Now().Add(delay)
|
retryAt := time.Now().Add(delay)
|
||||||
|
Loading…
Reference in New Issue
Block a user