From ff6768f9bb44c92ad8f7c523fb80a98d24f0571e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 4 Jul 2021 06:20:36 -0700 Subject: [PATCH] Fix recoverer to run task recovering logic every minute --- recoverer.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/recoverer.go b/recoverer.go index 9c3dae3..db07341 100644 --- a/recoverer.go +++ b/recoverer.go @@ -65,27 +65,31 @@ func (r *recoverer) start(wg *sync.WaitGroup) { timer.Stop() return case <-timer.C: - // Get all tasks which have expired 30 seconds ago or earlier. - deadline := time.Now().Add(-30 * time.Second) - msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...) - if err != nil { - r.logger.Warn("recoverer: could not list deadline exceeded tasks") - continue - } - const errMsg = "deadline exceeded" // TODO: better error message - for _, msg := range msgs { - if msg.Retried >= msg.Retry { - r.archive(msg, errMsg) - } else { - r.retry(msg, errMsg) - } - } - + r.recover() + timer.Reset(r.interval) } } }() } +func (r *recoverer) recover() { + // Get all tasks which have expired 30 seconds ago or earlier. + deadline := time.Now().Add(-30 * time.Second) + msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...) + if err != nil { + r.logger.Warn("recoverer: could not list deadline exceeded tasks") + return + } + const errMsg = "deadline exceeded" + for _, msg := range msgs { + if msg.Retried >= msg.Retry { + r.archive(msg, errMsg) + } else { + r.retry(msg, errMsg) + } + } +} + func (r *recoverer) retry(msg *base.TaskMessage, errMsg string) { delay := r.retryDelayFunc(msg.Retried, fmt.Errorf(errMsg), NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(delay)