Update recoverer

This commit is contained in:
Ken Hibino
2020-08-10 06:10:14 -07:00
parent a873d488ee
commit 8e23b865e9
3 changed files with 200 additions and 91 deletions

View File

@@ -21,6 +21,9 @@ type recoverer struct {
// channel to communicate back to the long running "recoverer" goroutine.
done chan struct{}
// list of queues to check for deadline.
queues []string
// poll interval.
interval time.Duration
}
@@ -28,6 +31,7 @@ type recoverer struct {
type recovererParams struct {
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
retryDelayFunc retryDelayFunc
}
@@ -37,6 +41,7 @@ func newRecoverer(params recovererParams) *recoverer {
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
queues: params.queues,
interval: params.interval,
retryDelayFunc: params.retryDelayFunc,
}
@@ -62,7 +67,7 @@ func (r *recoverer) start(wg *sync.WaitGroup) {
case <-timer.C:
// Get all tasks which have expired 30 seconds ago or earlier.
deadline := time.Now().Add(-30 * time.Second)
msgs, err := r.broker.ListDeadlineExceeded(deadline)
msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...)
if err != nil {
r.logger.Warn("recoverer: could not list deadline exceeded tasks")
continue