2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Add deadline to syncRequest

- syncer will drop a request if its deadline has been exceeded
This commit is contained in:
Ken Hibino 2020-06-19 05:51:50 -07:00
parent 4e8ac151ae
commit 83f1e20d74
3 changed files with 75 additions and 16 deletions

View File

@ -208,7 +208,7 @@ func (p *processor) exec() {
return return
case <-ctx.Done(): case <-ctx.Done():
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
p.retryOrKill(msg, ctx.Err()) p.retryOrKill(ctx, msg, ctx.Err())
return return
case resErr := <-resCh: case resErr := <-resCh:
// Note: One of three things should happen. // Note: One of three things should happen.
@ -219,10 +219,10 @@ func (p *processor) exec() {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry) p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
} }
p.retryOrKill(msg, resErr) p.retryOrKill(ctx, msg, resErr)
return return
} }
p.markAsDone(msg) p.markAsDone(ctx, msg)
} }
}() }()
} }
@ -237,55 +237,70 @@ func (p *processor) requeue(msg *base.TaskMessage) {
} }
} }
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
err := p.broker.Done(msg) err := p.broker.Done(msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err) errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Done(msg) return p.broker.Done(msg)
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline,
} }
} }
} }
func (p *processor) retryOrKill(msg *base.TaskMessage, err error) { func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry {
p.kill(msg, err) p.kill(ctx, msg, err)
} else { } else {
p.retry(msg, err) p.retry(ctx, msg, err)
} }
} }
func (p *processor) retry(msg *base.TaskMessage, e error) { func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.broker.Retry(msg, retryAt, e.Error()) err := p.broker.Retry(msg, retryAt, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Retry(msg, retryAt, e.Error()) return p.broker.Retry(msg, retryAt, e.Error())
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline,
} }
} }
} }
func (p *processor) kill(msg *base.TaskMessage, e error) { func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error()) err := p.broker.Kill(msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Kill(msg, e.Error()) return p.broker.Kill(msg, e.Error())
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline,
} }
} }
} }

View File

@ -26,8 +26,9 @@ type syncer struct {
} }
type syncRequest struct { type syncRequest struct {
fn func() error // sync operation fn func() error // sync operation
errMsg string // error message errMsg string // error message
deadline time.Time // request should be dropped if deadline has been exceeded
} }
type syncerParams struct { type syncerParams struct {
@ -72,6 +73,9 @@ func (s *syncer) start(wg *sync.WaitGroup) {
case <-time.After(s.interval): case <-time.After(s.interval):
var temp []*syncRequest var temp []*syncRequest
for _, req := range requests { for _, req := range requests {
if req.deadline.Before(time.Now()) {
continue // drop stale request
}
if err := req.fn(); err != nil { if err := req.fn(); err != nil {
temp = append(temp, req) temp = append(temp, req)
} }

View File

@ -42,6 +42,7 @@ func TestSyncer(t *testing.T) {
fn: func() error { fn: func() error {
return rdbClient.Done(m) return rdbClient.Done(m)
}, },
deadline: time.Now().Add(5 * time.Minute),
} }
} }
@ -85,8 +86,9 @@ func TestSyncerRetry(t *testing.T) {
} }
syncRequestCh <- &syncRequest{ syncRequestCh <- &syncRequest{
fn: requestFunc, fn: requestFunc,
errMsg: "error", errMsg: "error",
deadline: time.Now().Add(5 * time.Minute),
} }
// allow syncer to retry // allow syncer to retry
@ -98,3 +100,41 @@ func TestSyncerRetry(t *testing.T) {
} }
mu.Unlock() mu.Unlock()
} }
func TestSyncerDropsStaleRequests(t *testing.T) {
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncerParams{
logger: testLogger,
requestsCh: syncRequestCh,
interval: interval,
})
var wg sync.WaitGroup
syncer.start(&wg)
var (
mu sync.Mutex
n int // number of times request has been processed
)
for i := 0; i < 10; i++ {
syncRequestCh <- &syncRequest{
fn: func() error {
mu.Lock()
n++
mu.Unlock()
return nil
},
deadline: time.Now().Add(time.Duration(-i) * time.Second), // already exceeded deadline
}
}
time.Sleep(2 * interval) // ensure that syncer runs at least once
syncer.terminate()
mu.Lock()
if n != 0 {
t.Errorf("requests has been processed %d times, want 0", n)
}
mu.Unlock()
}