diff --git a/processor.go b/processor.go index b41421c..c3472b7 100644 --- a/processor.go +++ b/processor.go @@ -208,7 +208,7 @@ func (p *processor) exec() { return case <-ctx.Done(): p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above - p.retryOrKill(msg, ctx.Err()) + p.retryOrKill(ctx, msg, ctx.Err()) return case resErr := <-resCh: // Note: One of three things should happen. @@ -219,10 +219,10 @@ func (p *processor) exec() { if p.errHandler != nil { p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry) } - p.retryOrKill(msg, resErr) + p.retryOrKill(ctx, msg, resErr) return } - p.markAsDone(msg) + p.markAsDone(ctx, msg) } }() } @@ -237,55 +237,70 @@ func (p *processor) requeue(msg *base.TaskMessage) { } } -func (p *processor) markAsDone(msg *base.TaskMessage) { +func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err) + deadline, ok := ctx.Deadline() + if !ok { + panic("asynq: internal error: missing deadline in context") + } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Done(msg) }, - errMsg: errMsg, + errMsg: errMsg, + deadline: deadline, } } } -func (p *processor) retryOrKill(msg *base.TaskMessage, err error) { +func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { if msg.Retried >= msg.Retry { - p.kill(msg, err) + p.kill(ctx, msg, err) } else { - p.retry(msg, err) + p.retry(ctx, msg, err) } } -func (p *processor) retry(msg *base.TaskMessage, e error) { +func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(d) err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) + deadline, ok := ctx.Deadline() + if !ok { + panic("asynq: internal error: missing deadline in context") + } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Retry(msg, retryAt, e.Error()) }, - errMsg: errMsg, + errMsg: errMsg, + deadline: deadline, } } } -func (p *processor) kill(msg *base.TaskMessage, e error) { +func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) err := p.broker.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) + deadline, ok := ctx.Deadline() + if !ok { + panic("asynq: internal error: missing deadline in context") + } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Kill(msg, e.Error()) }, - errMsg: errMsg, + errMsg: errMsg, + deadline: deadline, } } } diff --git a/syncer.go b/syncer.go index c5ca396..d108a7a 100644 --- a/syncer.go +++ b/syncer.go @@ -26,8 +26,9 @@ type syncer struct { } type syncRequest struct { - fn func() error // sync operation - errMsg string // error message + fn func() error // sync operation + errMsg string // error message + deadline time.Time // request should be dropped if deadline has been exceeded } type syncerParams struct { @@ -72,6 +73,9 @@ func (s *syncer) start(wg *sync.WaitGroup) { case <-time.After(s.interval): var temp []*syncRequest for _, req := range requests { + if req.deadline.Before(time.Now()) { + continue // drop stale request + } if err := req.fn(); err != nil { temp = append(temp, req) } diff --git a/syncer_test.go b/syncer_test.go index 85f68cb..1be2d18 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -42,6 +42,7 @@ func TestSyncer(t *testing.T) { fn: func() error { return rdbClient.Done(m) }, + deadline: time.Now().Add(5 * time.Minute), } } @@ -85,8 +86,9 @@ func TestSyncerRetry(t *testing.T) { } syncRequestCh <- &syncRequest{ - fn: requestFunc, - errMsg: "error", + fn: requestFunc, + errMsg: "error", + deadline: time.Now().Add(5 * time.Minute), } // allow syncer to retry @@ -98,3 +100,41 @@ func TestSyncerRetry(t *testing.T) { } mu.Unlock() } + +func TestSyncerDropsStaleRequests(t *testing.T) { + const interval = time.Second + syncRequestCh := make(chan *syncRequest) + syncer := newSyncer(syncerParams{ + logger: testLogger, + requestsCh: syncRequestCh, + interval: interval, + }) + var wg sync.WaitGroup + syncer.start(&wg) + + var ( + mu sync.Mutex + n int // number of times request has been processed + ) + + for i := 0; i < 10; i++ { + syncRequestCh <- &syncRequest{ + fn: func() error { + mu.Lock() + n++ + mu.Unlock() + return nil + }, + deadline: time.Now().Add(time.Duration(-i) * time.Second), // already exceeded deadline + } + } + + time.Sleep(2 * interval) // ensure that syncer runs at least once + syncer.terminate() + + mu.Lock() + if n != 0 { + t.Errorf("requests has been processed %d times, want 0", n) + } + mu.Unlock() +}