From 8211167de252e5e5d25d4d317d9f62109f2d100b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 15 Feb 2022 06:58:54 -0800 Subject: [PATCH] Update processor to create a lease and watch for expiration --- processor.go | 110 ++++++++++++++++++++++++++-------------------- processor_test.go | 98 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 48 deletions(-) diff --git a/processor.go b/processor.go index c81c998..925b9c9 100644 --- a/processor.go +++ b/processor.go @@ -171,7 +171,7 @@ func (p *processor) exec() { return case p.sema <- struct{}{}: // acquire token qnames := p.queues() - msg, err := p.broker.Dequeue(qnames...) + msg, leaseExpirationTime, err := p.broker.Dequeue(qnames...) switch { case errors.Is(err, errors.ErrNoProcessableTask): p.logger.Debug("All queues are empty") @@ -190,8 +190,9 @@ func (p *processor) exec() { return } + lease := base.NewLease(leaseExpirationTime) deadline := p.computeDeadline(msg) - p.starting <- &workerInfo{msg, time.Now(), deadline} + p.starting <- &workerInfo{msg, time.Now(), deadline, lease} go func() { defer func() { p.finished <- msg @@ -209,7 +210,7 @@ func (p *processor) exec() { select { case <-ctx.Done(): // already canceled (e.g. deadline exceeded). - p.handleFailedMessage(ctx, msg, ctx.Err()) + p.handleFailedMessage(ctx, lease, msg, ctx.Err()) return default: } @@ -233,24 +234,33 @@ func (p *processor) exec() { case <-p.abort: // time is up, push the message back to queue and quit this worker goroutine. p.logger.Warnf("Quitting worker. task id=%s", msg.ID) - p.requeue(msg) + p.requeue(lease, msg) + return + case <-lease.Done(): + cancel() + p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired) return case <-ctx.Done(): - p.handleFailedMessage(ctx, msg, ctx.Err()) + p.handleFailedMessage(ctx, lease, msg, ctx.Err()) return case resErr := <-resCh: if resErr != nil { - p.handleFailedMessage(ctx, msg, resErr) + p.handleFailedMessage(ctx, lease, msg, resErr) return } - p.handleSucceededMessage(ctx, msg) + p.handleSucceededMessage(lease, msg) } }() } } -func (p *processor) requeue(msg *base.TaskMessage) { - err := p.broker.Requeue(msg) +func (p *processor) requeue(l *base.Lease, msg *base.TaskMessage) { + if !l.IsValid() { + // If lease is not valid, do not write to redis; Let recoverer take care of it. + return + } + ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) + err := p.broker.Requeue(ctx, msg) if err != nil { p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err) } else { @@ -258,49 +268,51 @@ func (p *processor) requeue(msg *base.TaskMessage) { } } -func (p *processor) handleSucceededMessage(ctx context.Context, msg *base.TaskMessage) { +func (p *processor) handleSucceededMessage(l *base.Lease, msg *base.TaskMessage) { if msg.Retention > 0 { - p.markAsComplete(ctx, msg) + p.markAsComplete(l, msg) } else { - p.markAsDone(ctx, msg) + p.markAsDone(l, msg) } } -func (p *processor) markAsComplete(ctx context.Context, msg *base.TaskMessage) { - err := p.broker.MarkAsComplete(msg) +func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) { + if !l.IsValid() { + // If lease is not valid, do not write to redis; Let recoverer take care of it. + return + } + ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) + err := p.broker.MarkAsComplete(ctx, msg) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), base.CompletedKey(msg.Queue), err) - deadline, ok := ctx.Deadline() - if !ok { - panic("asynq: internal error: missing deadline in context") - } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.MarkAsComplete(msg) + return p.broker.MarkAsComplete(ctx, msg) }, errMsg: errMsg, - deadline: deadline, + deadline: l.Deadline(), } } } -func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { - err := p.broker.Done(msg) +func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) { + if !l.IsValid() { + // If lease is not valid, do not write to redis; Let recoverer take care of it. + return + } + ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) + err := p.broker.Done(ctx, msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err) - deadline, ok := ctx.Deadline() - if !ok { - panic("asynq: internal error: missing deadline in context") - } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Done(msg) + return p.broker.Done(ctx, msg) }, errMsg: errMsg, - deadline: deadline, + deadline: l.Deadline(), } } } @@ -309,59 +321,61 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { // the task should not be retried and should be archived instead. var SkipRetry = errors.New("skip retry for the task") -func (p *processor) handleFailedMessage(ctx context.Context, msg *base.TaskMessage, err error) { +func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) } if !p.isFailureFunc(err) { // retry the task without marking it as failed - p.retry(ctx, msg, err, false /*isFailure*/) + p.retry(l, msg, err, false /*isFailure*/) return } if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) - p.archive(ctx, msg, err) + p.archive(l, msg, err) } else { - p.retry(ctx, msg, err, true /*isFailure*/) + p.retry(l, msg, err, true /*isFailure*/) } } -func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error, isFailure bool) { +func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailure bool) { + if !l.IsValid() { + // If lease is not valid, do not write to redis; Let recoverer take care of it. + return + } + ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(d) - err := p.broker.Retry(msg, retryAt, e.Error(), isFailure) + err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue)) - deadline, ok := ctx.Deadline() - if !ok { - panic("asynq: internal error: missing deadline in context") - } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Retry(msg, retryAt, e.Error(), isFailure) + return p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) }, errMsg: errMsg, - deadline: deadline, + deadline: l.Deadline(), } } } -func (p *processor) archive(ctx context.Context, msg *base.TaskMessage, e error) { - err := p.broker.Archive(msg, e.Error()) +func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) { + if !l.IsValid() { + // If lease is not valid, do not write to redis; Let recoverer take care of it. + return + } + ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) + err := p.broker.Archive(ctx, msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue)) - deadline, ok := ctx.Deadline() - if !ok { - panic("asynq: internal error: missing deadline in context") - } p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Archive(msg, e.Error()) + return p.broker.Archive(ctx, msg, e.Error()) }, errMsg: errMsg, - deadline: deadline, + deadline: l.Deadline(), } } } diff --git a/processor_test.go b/processor_test.go index dc3dd06..83a7726 100644 --- a/processor_test.go +++ b/processor_test.go @@ -17,6 +17,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/timeutil" @@ -483,6 +484,103 @@ func TestProcessorMarkAsComplete(t *testing.T) { } } +// Test a scenario where the worker server cannot communicate with redis due to a network failure +// and the lease expires +func TestProcessorWithExpiredLease(t *testing.T) { + r := setup(t) + defer r.Close() + rdbClient := rdb.NewRDB(r) + + m1 := h.NewTaskMessage("task1", nil) + + tests := []struct { + pending []*base.TaskMessage + handler Handler + wantErrCount int + }{ + { + pending: []*base.TaskMessage{m1}, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + // make sure the task processing time exceeds lease duration + // to test expired lease. + time.Sleep(rdb.LeaseDuration + 10*time.Second) + return nil + }), + wantErrCount: 1, // ErrorHandler should still be called with ErrLeaseExpired + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) + + starting := make(chan *workerInfo) + finished := make(chan *base.TaskMessage) + syncCh := make(chan *syncRequest) + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + // fake heartbeater which notifies lease expiration + go func() { + for { + select { + case w := <-starting: + // simulate expiration by resetting to some time in the past + w.lease.Reset(time.Now().Add(-5 * time.Second)) + if !w.lease.NotifyExpiration() { + panic("Failed to notifiy lease expiration") + } + case <-finished: + // do nothing + case <-done: + return + } + } + }() + go fakeSyncer(syncCh, done) + p := newProcessor(processorParams{ + logger: testLogger, + broker: rdbClient, + retryDelayFunc: DefaultRetryDelayFunc, + isFailureFunc: defaultIsFailureFunc, + syncCh: syncCh, + cancelations: base.NewCancelations(), + concurrency: 10, + queues: defaultQueueConfig, + strictPriority: false, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + starting: starting, + finished: finished, + }) + p.handler = tc.handler + var ( + mu sync.Mutex // guards n and errs + n int // number of times error handler is called + errs []error // error passed to error handler + ) + p.errHandler = ErrorHandlerFunc(func(ctx context.Context, t *Task, err error) { + mu.Lock() + defer mu.Unlock() + n++ + errs = append(errs, err) + }) + + p.start(&sync.WaitGroup{}) + time.Sleep(4 * time.Second) + p.shutdown() + + if n != tc.wantErrCount { + t.Errorf("Unexpected number of error count: got %d, want %d", n, tc.wantErrCount) + continue + } + for i := 0; i < tc.wantErrCount; i++ { + if !errors.Is(errs[i], ErrLeaseExpired) { + t.Errorf("Unexpected error was passed to ErrorHandler: got %v want %v", errs[i], ErrLeaseExpired) + } + } + } +} + func TestProcessorQueues(t *testing.T) { sortOpt := cmp.Transformer("SortStrings", func(in []string) []string { out := append([]string(nil), in...) // Copy input to avoid mutating it