diff --git a/CHANGELOG.md b/CHANGELOG.md index 36f742d..84cd41c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `BaseContext` is introduced in `Config` to specify callback hook to provide a base `context` from which `Handler` `context` is derived +### Changed + +- `Server` now recovers tasks with an expired lease. Recovered tasks are retried/archived with `ErrLeaseExpired` error. + ## [0.21.0] - 2022-01-22 ### Added diff --git a/internal/base/base.go b/internal/base/base.go index cede294..690ee20 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -625,7 +625,7 @@ type Broker interface { Archive(msg *TaskMessage, errMsg string) error ForwardIfReady(qnames ...string) error DeleteExpiredCompletedTasks(qname string) error - ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) + ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 2b5e8f8..3633efa 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -154,13 +154,13 @@ func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error { return tb.real.DeleteExpiredCompletedTasks(qname) } -func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { +func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return nil, errRedisDown } - return tb.real.ListDeadlineExceeded(deadline, qnames...) + return tb.real.ListLeaseExpired(cutoff, qnames...) } func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { diff --git a/recoverer.go b/recoverer.go index 4165e43..590bb8c 100644 --- a/recoverer.go +++ b/recoverer.go @@ -5,11 +5,11 @@ package asynq import ( - "context" "sync" "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/log" ) @@ -76,19 +76,23 @@ func (r *recoverer) start(wg *sync.WaitGroup) { }() } +// ErrLeaseExpired error indicates that the task failed because the worker working on the task +// could not extend its lease due to missing heartbeats. The worker may have crashed or got cutoff from the network. +var ErrLeaseExpired = errors.New("asynq: task lease expired") + func (r *recoverer) recover() { - // Get all tasks which have expired 30 seconds ago or earlier. - deadline := time.Now().Add(-30 * time.Second) - msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...) + // Get all tasks which have expired 30 seconds ago or earlier to accomodate certain amount of clock skew. + cutoff := time.Now().Add(-30 * time.Second) + msgs, err := r.broker.ListLeaseExpired(cutoff, r.queues...) if err != nil { - r.logger.Warn("recoverer: could not list deadline exceeded tasks") + r.logger.Warn("recoverer: could not list lease expired tasks") return } for _, msg := range msgs { if msg.Retried >= msg.Retry { - r.archive(msg, context.DeadlineExceeded) + r.archive(msg, ErrLeaseExpired) } else { - r.retry(msg, context.DeadlineExceeded) + r.retry(msg, ErrLeaseExpired) } } } @@ -97,7 +101,7 @@ func (r *recoverer) retry(msg *base.TaskMessage, err error) { delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(delay) if err := r.broker.Retry(msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { - r.logger.Warnf("recoverer: could not retry deadline exceeded task: %v", err) + r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) } } diff --git a/recoverer_test.go b/recoverer_test.go index a62f7b7..064128d 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -27,29 +27,25 @@ func TestRecoverer(t *testing.T) { t4.Retried = t4.Retry // t4 has reached its max retry count now := time.Now() - oneHourFromNow := now.Add(1 * time.Hour) - fiveMinutesFromNow := now.Add(5 * time.Minute) - fiveMinutesAgo := now.Add(-5 * time.Minute) - oneHourAgo := now.Add(-1 * time.Hour) tests := []struct { - desc string - inProgress map[string][]*base.TaskMessage - deadlines map[string][]base.Z - retry map[string][]base.Z - archived map[string][]base.Z - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantRetry map[string][]*base.TaskMessage - wantArchived map[string][]*base.TaskMessage + desc string + active map[string][]*base.TaskMessage + lease map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z + wantRetry map[string][]*base.TaskMessage + wantArchived map[string][]*base.TaskMessage }{ { desc: "with one active task", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(-1 * time.Minute).Unix()}}, }, retry: map[string][]base.Z{ "default": {}, @@ -60,7 +56,7 @@ func TestRecoverer(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, }, wantRetry: map[string][]*base.TaskMessage{ @@ -72,12 +68,12 @@ func TestRecoverer(t *testing.T) { }, { desc: "with a task with max-retry reached", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t4}, "critical": {}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t4, Score: fiveMinutesAgo.Unix()}}, + lease: map[string][]base.Z{ + "default": {{Message: t4, Score: now.Add(-40 * time.Second).Unix()}}, "critical": {}, }, retry: map[string][]base.Z{ @@ -92,7 +88,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -107,17 +103,17 @@ func TestRecoverer(t *testing.T) { }, { desc: "with multiple active tasks, and one expired", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, "critical": {t3}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesFromNow.Unix()}, + {Message: t1, Score: now.Add(-2 * time.Minute).Unix()}, + {Message: t2, Score: now.Add(20 * time.Second).Unix()}, }, "critical": { - {Message: t3, Score: oneHourFromNow.Unix()}, + {Message: t3, Score: now.Add(20 * time.Second).Unix()}, }, }, retry: map[string][]base.Z{ @@ -132,9 +128,9 @@ func TestRecoverer(t *testing.T) { "default": {t2}, "critical": {t3}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: fiveMinutesFromNow.Unix()}}, - "critical": {{Message: t3, Score: oneHourFromNow.Unix()}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, + "critical": {{Message: t3, Score: now.Add(20 * time.Second).Unix()}}, }, wantRetry: map[string][]*base.TaskMessage{ "default": {t1}, @@ -147,17 +143,17 @@ func TestRecoverer(t *testing.T) { }, { desc: "with multiple expired active tasks", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, "critical": {t3}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: oneHourFromNow.Unix()}, + {Message: t1, Score: now.Add(-1 * time.Minute).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, }, "critical": { - {Message: t3, Score: fiveMinutesAgo.Unix()}, + {Message: t3, Score: now.Add(-1 * time.Minute).Unix()}, }, }, retry: map[string][]base.Z{ @@ -172,8 +168,8 @@ func TestRecoverer(t *testing.T) { "default": {t2}, "critical": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: oneHourFromNow.Unix()}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, wantRetry: map[string][]*base.TaskMessage{ "default": {t1}, @@ -186,11 +182,11 @@ func TestRecoverer(t *testing.T) { }, { desc: "with empty active queue", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -206,7 +202,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -223,8 +219,8 @@ func TestRecoverer(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - h.SeedAllActiveQueues(t, r, tc.inProgress) - h.SeedAllDeadlines(t, r, tc.deadlines) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllLease(t, r, tc.lease) h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllArchivedQueues(t, r, tc.archived) @@ -249,10 +245,10 @@ func TestRecoverer(t *testing.T) { t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.ActiveKey(qname), diff) } } - for qname, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r, qname) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.DeadlinesKey(qname), diff) + for qname, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r, qname) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.LeaseKey(qname), diff) } } cmpOpt := h.EquateInt64Approx(2) // allow up to two-second difference in `LastFailedAt` @@ -260,7 +256,7 @@ func TestRecoverer(t *testing.T) { gotRetry := h.GetRetryMessages(t, r, qname) var wantRetry []*base.TaskMessage // Note: construct message here since `LastFailedAt` is relative to each test run for _, msg := range msgs { - wantRetry = append(wantRetry, h.TaskMessageAfterRetry(*msg, "context deadline exceeded", runTime)) + wantRetry = append(wantRetry, h.TaskMessageAfterRetry(*msg, ErrLeaseExpired.Error(), runTime)) } if diff := cmp.Diff(wantRetry, gotRetry, h.SortMsgOpt, cmpOpt); diff != "" { t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff) @@ -270,7 +266,7 @@ func TestRecoverer(t *testing.T) { gotArchived := h.GetArchivedMessages(t, r, qname) var wantArchived []*base.TaskMessage for _, msg := range msgs { - wantArchived = append(wantArchived, h.TaskMessageWithError(*msg, "context deadline exceeded", runTime)) + wantArchived = append(wantArchived, h.TaskMessageWithError(*msg, ErrLeaseExpired.Error(), runTime)) } if diff := cmp.Diff(wantArchived, gotArchived, h.SortMsgOpt, cmpOpt); diff != "" { t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff)