From b8cb5794077e4111763ddd772e094f0199a1640c Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 10 Feb 2022 19:01:05 -0800 Subject: [PATCH] Update RDB methods to use lease instead of deadlines set --- internal/asynqtest/asynqtest.go | 15 ++ internal/rdb/rdb.go | 30 +-- internal/rdb/rdb_test.go | 353 +++++++++++++++----------------- 3 files changed, 195 insertions(+), 203 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 932668c..1678fa9 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -230,6 +230,13 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive) } +// SeedLease initializes the lease set with the given entries. +func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { + tb.Helper() + r.SAdd(context.Background(), base.AllQueues, qname) + seedRedisZSet(tb, r, base.LeaseKey(qname), entries, base.TaskStateActive) +} + // SeedCompletedQueue initializes the completed set witht the given entries. func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() @@ -287,6 +294,14 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } } +// SeedAllLease initializes all of the lease sets with the given entries. +func SeedAllLease(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) { + tb.Helper() + for q, entries := range deadlines { + SeedLease(tb, r, entries, q) + } +} + // SeedAllCompletedQueues initializes all of the completed queues with the given entries. func SeedAllCompletedQueues(tb testing.TB, r redis.UniversalClient, completed map[string][]base.Z) { tb.Helper() diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d3ab9c8..6b916f0 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -275,10 +275,11 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) { } // KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines +// KEYS[2] -> asynq:{}:lease // KEYS[3] -> asynq:{}:t: // KEYS[4] -> asynq:{}:processed: // KEYS[5] -> asynq:{}:processed +// ------- // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> max int64 value @@ -306,11 +307,12 @@ return redis.status_reply("OK") `) // KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines +// KEYS[2] -> asynq:{}:lease // KEYS[3] -> asynq:{}:t: // KEYS[4] -> asynq:{}:processed: // KEYS[5] -> asynq:{}:processed // KEYS[6] -> unique key +// ------- // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> max int64 value @@ -349,7 +351,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { expireAt := now.Add(statsTTL) keys := []string{ base.ActiveKey(msg.Queue), - base.DeadlinesKey(msg.Queue), + base.LeaseKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), base.ProcessedKey(msg.Queue, now), base.ProcessedTotalKey(msg.Queue), @@ -368,7 +370,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { } // KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines +// KEYS[2] -> asynq:{}:lease // KEYS[3] -> asynq:{}:completed // KEYS[4] -> asynq:{}:t: // KEYS[5] -> asynq:{}:processed: @@ -404,7 +406,7 @@ return redis.status_reply("OK") `) // KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines +// KEYS[2] -> asynq:{}:lease // KEYS[3] -> asynq:{}:completed // KEYS[4] -> asynq:{}:t: // KEYS[5] -> asynq:{}:processed: @@ -457,7 +459,7 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error { } keys := []string{ base.ActiveKey(msg.Queue), - base.DeadlinesKey(msg.Queue), + base.LeaseKey(msg.Queue), base.CompletedKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), base.ProcessedKey(msg.Queue, now), @@ -479,7 +481,7 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error { } // KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines +// KEYS[2] -> asynq:{}:lease // KEYS[3] -> asynq:{}:pending // KEYS[4] -> asynq:{}:t: // ARGV[1] -> task ID @@ -501,7 +503,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { ctx := context.Background() keys := []string{ base.ActiveKey(msg.Queue), - base.DeadlinesKey(msg.Queue), + base.LeaseKey(msg.Queue), base.PendingKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), } @@ -634,13 +636,13 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:active -// KEYS[3] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:lease // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:processed: // KEYS[6] -> asynq:{}:failed: // KEYS[7] -> asynq:{}:processed // KEYS[8] -> asynq:{}:failed -// +// ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> retry_at UNIX timestamp @@ -697,7 +699,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i keys := []string{ base.TaskKey(msg.Queue, msg.ID), base.ActiveKey(msg.Queue), - base.DeadlinesKey(msg.Queue), + base.LeaseKey(msg.Queue), base.RetryKey(msg.Queue), base.ProcessedKey(msg.Queue, now), base.FailedKey(msg.Queue, now), @@ -722,13 +724,13 @@ const ( // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:active -// KEYS[3] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:lease // KEYS[4] -> asynq:{}:archived // KEYS[5] -> asynq:{}:processed: // KEYS[6] -> asynq:{}:failed: // KEYS[7] -> asynq:{}:processed // KEYS[8] -> asynq:{}:failed -// +// ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> died_at UNIX timestamp @@ -783,7 +785,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { keys := []string{ base.TaskKey(msg.Queue, msg.ID), base.ActiveKey(msg.Queue), - base.DeadlinesKey(msg.Queue), + base.LeaseKey(msg.Queue), base.ArchivedKey(msg.Queue), base.ProcessedKey(msg.Queue, now), base.FailedKey(msg.Queue, now), diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index fa8e058..a2e479e 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -681,17 +681,14 @@ func TestDone(t *testing.T) { UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72", Queue: "default", } - t1Deadline := now.Unix() + t1.Timeout - t2Deadline := t2.Deadline - t3Deadline := now.Unix() + t3.Timeout tests := []struct { - desc string - active map[string][]*base.TaskMessage // initial state of the active list - deadlines map[string][]base.Z // initial state of the deadlines set - target *base.TaskMessage // task to remove - wantActive map[string][]*base.TaskMessage // final state of the active list - wantDeadlines map[string][]base.Z // final state of the deadline set + desc string + active map[string][]*base.TaskMessage // initial state of the active list + lease map[string][]base.Z // initial state of the lease set + target *base.TaskMessage // task to remove + wantActive map[string][]*base.TaskMessage // final state of the active list + wantLease map[string][]base.Z // final state of the lease set }{ { desc: "removes message from the correct queue", @@ -699,18 +696,18 @@ func TestDone(t *testing.T) { "default": {t1}, "custom": {t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, target: t1, wantActive: map[string][]*base.TaskMessage{ "default": {}, "custom": {t2}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, - "custom": {{Message: t2, Score: t2Deadline}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, }, { @@ -718,14 +715,14 @@ func TestDone(t *testing.T) { active: map[string][]*base.TaskMessage{ "default": {t1}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, }, target: t1, wantActive: map[string][]*base.TaskMessage{ "default": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, }, }, @@ -735,25 +732,25 @@ func TestDone(t *testing.T) { "default": {t1, t3}, "custom": {t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(15 * time.Second).Unix()}, {Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, target: t3, wantActive: map[string][]*base.TaskMessage{ "default": {t1}, "custom": {t2}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(15 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) h.SeedAllActiveQueues(t, r.client, tc.active) for _, msgs := range tc.active { for _, msg := range msgs { @@ -780,10 +777,10 @@ func TestDone(t *testing.T) { continue } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.LeaseKey(queue), diff) continue } } @@ -825,9 +822,9 @@ func TestDoneWithMaxCounter(t *testing.T) { z := base.Z{ Message: msg, - Score: time.Now().Add(5 * time.Minute).Unix(), + Score: time.Now().Add(15 * time.Second).Unix(), } - h.SeedDeadlines(t, r.client, []base.Z{z}, msg.Queue) + h.SeedLease(t, r.client, []base.Z{z}, msg.Queue) h.SeedActiveQueue(t, r.client, []*base.TaskMessage{msg}, msg.Queue) processedTotalKey := base.ProcessedTotalKey(msg.Queue) @@ -850,6 +847,7 @@ func TestMarkAsComplete(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) t1 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -878,19 +876,16 @@ func TestMarkAsComplete(t *testing.T) { Queue: "default", Retention: 1800, } - t1Deadline := now.Unix() + t1.Timeout - t2Deadline := t2.Deadline - t3Deadline := now.Unix() + t3.Timeout tests := []struct { desc string - active map[string][]*base.TaskMessage // initial state of the active list - deadlines map[string][]base.Z // initial state of the deadlines set - completed map[string][]base.Z // initial state of the completed set - target *base.TaskMessage // task to mark as completed - wantActive map[string][]*base.TaskMessage // final state of the active list - wantDeadlines map[string][]base.Z // final state of the deadline set - wantCompleted func(completedAt time.Time) map[string][]base.Z // final state of the completed set + active map[string][]*base.TaskMessage // initial state of the active list + lease map[string][]base.Z // initial state of the lease set + completed map[string][]base.Z // initial state of the completed set + target *base.TaskMessage // task to mark as completed + wantActive map[string][]*base.TaskMessage // final state of the active list + wantLease map[string][]base.Z // final state of the lease set + wantCompleted map[string][]base.Z // final state of the completed set }{ { desc: "select a message from the correct queue", @@ -898,9 +893,9 @@ func TestMarkAsComplete(t *testing.T) { "default": {t1}, "custom": {t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(30 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, completed: map[string][]base.Z{ "default": {}, @@ -911,15 +906,13 @@ func TestMarkAsComplete(t *testing.T) { "default": {}, "custom": {t2}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, - "custom": {{Message: t2, Score: t2Deadline}}, + "custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}}, }, - wantCompleted: func(completedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.Retention}}, - "custom": {}, - } + wantCompleted: map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t1, now), Score: now.Unix() + t1.Retention}}, + "custom": {}, }, }, { @@ -927,8 +920,8 @@ func TestMarkAsComplete(t *testing.T) { active: map[string][]*base.TaskMessage{ "default": {t1}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, }, completed: map[string][]base.Z{ "default": {}, @@ -937,13 +930,11 @@ func TestMarkAsComplete(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, }, - wantCompleted: func(completedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.Retention}}, - } + wantCompleted: map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t1, now), Score: now.Unix() + t1.Retention}}, }, }, { @@ -952,9 +943,9 @@ func TestMarkAsComplete(t *testing.T) { "default": {t1, t3}, "custom": {t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t3, Score: now.Add(12 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(12 * time.Second).Unix()}}, }, completed: map[string][]base.Z{ "default": {}, @@ -965,22 +956,20 @@ func TestMarkAsComplete(t *testing.T) { "default": {t1}, "custom": {t2}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, - "custom": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, + "custom": {{Message: t2, Score: now.Add(12 * time.Second).Unix()}}, }, - wantCompleted: func(completedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {{Message: h.TaskMessageWithCompletedAt(*t3, completedAt), Score: completedAt.Unix() + t3.Retention}}, - "custom": {}, - } + wantCompleted: map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t3, now), Score: now.Unix() + t3.Retention}}, + "custom": {}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) h.SeedAllActiveQueues(t, r.client, tc.active) h.SeedAllCompletedQueues(t, r.client, tc.completed) for _, msgs := range tc.active { @@ -995,7 +984,6 @@ func TestMarkAsComplete(t *testing.T) { } } - completedAt := time.Now() err := r.MarkAsComplete(tc.target) if err != nil { t.Errorf("%s; (*RDB).MarkAsCompleted(task) = %v, want nil", tc.desc, err) @@ -1009,14 +997,14 @@ func TestMarkAsComplete(t *testing.T) { continue } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.LeaseKey(queue), diff) continue } } - for queue, want := range tc.wantCompleted(completedAt) { + for queue, want := range tc.wantCompleted { gotCompleted := h.GetCompletedEntries(t, r.client, queue) if diff := cmp.Diff(want, gotCompleted, h.SortZSetEntryOpt); diff != "" { t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.CompletedKey(queue), diff) @@ -1066,18 +1054,15 @@ func TestRequeue(t *testing.T) { Queue: "critical", Timeout: 80, } - t1Deadline := now.Unix() + t1.Timeout - t2Deadline := now.Unix() + t2.Timeout - t3Deadline := now.Unix() + t3.Timeout tests := []struct { - pending map[string][]*base.TaskMessage // initial state of queues - active map[string][]*base.TaskMessage // initial state of the active list - deadlines map[string][]base.Z // initial state of the deadlines set - target *base.TaskMessage // task to requeue - wantPending map[string][]*base.TaskMessage // final state of queues - wantActive map[string][]*base.TaskMessage // final state of the active list - wantDeadlines map[string][]base.Z // final state of the deadlines set + pending map[string][]*base.TaskMessage // initial state of queues + active map[string][]*base.TaskMessage // initial state of the active list + lease map[string][]base.Z // initial state of the lease set + target *base.TaskMessage // task to requeue + wantPending map[string][]*base.TaskMessage // final state of queues + wantActive map[string][]*base.TaskMessage // final state of the active list + wantLease map[string][]base.Z // final state of the lease set }{ { pending: map[string][]*base.TaskMessage{ @@ -1086,10 +1071,10 @@ func TestRequeue(t *testing.T) { active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, }, }, target: t1, @@ -1099,9 +1084,9 @@ func TestRequeue(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": { - {Message: t2, Score: t2Deadline}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, }, }, }, @@ -1112,9 +1097,9 @@ func TestRequeue(t *testing.T) { active: map[string][]*base.TaskMessage{ "default": {t2}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t2, Score: t2Deadline}, + {Message: t2, Score: now.Add(20 * time.Second).Unix()}, }, }, target: t2, @@ -1124,7 +1109,7 @@ func TestRequeue(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": {}, }, }, @@ -1137,9 +1122,9 @@ func TestRequeue(t *testing.T) { "default": {t2}, "critical": {t3}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: t2Deadline}}, - "critical": {{Message: t3, Score: t3Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, }, target: t3, wantPending: map[string][]*base.TaskMessage{ @@ -1150,8 +1135,8 @@ func TestRequeue(t *testing.T) { "default": {t2}, "critical": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, "critical": {}, }, }, @@ -1161,7 +1146,7 @@ func TestRequeue(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllActiveQueues(t, r.client, tc.active) - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) err := r.Requeue(tc.target) if err != nil { @@ -1181,10 +1166,10 @@ func TestRequeue(t *testing.T) { t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.ActiveKey(qname), diff) } } - for qname, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DeadlinesKey(qname), diff) + for qname, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.LeaseKey(qname), diff) } } } @@ -1466,28 +1451,25 @@ func TestRetry(t *testing.T) { Timeout: 1800, Queue: "custom", } - t1Deadline := now.Unix() + t1.Timeout - t2Deadline := now.Unix() + t2.Timeout - t4Deadline := now.Unix() + t4.Timeout errMsg := "SMTP server is not responding" tests := []struct { - active map[string][]*base.TaskMessage - deadlines map[string][]base.Z - retry map[string][]base.Z - msg *base.TaskMessage - processAt time.Time - errMsg string - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantRetry map[string][]base.Z + active map[string][]*base.TaskMessage + lease map[string][]base.Z + retry map[string][]base.Z + msg *base.TaskMessage + processAt time.Time + errMsg string + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z + wantRetry map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, retry: map[string][]base.Z{ "default": {{Message: t3, Score: now.Add(time.Minute).Unix()}}, @@ -1498,8 +1480,8 @@ func TestRetry(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, wantRetry: map[string][]base.Z{ "default": { @@ -1513,9 +1495,9 @@ func TestRetry(t *testing.T) { "default": {t1, t2}, "custom": {t4}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, - "custom": {{Message: t4, Score: t4Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(20 * time.Second).Unix()}}, + "custom": {{Message: t4, Score: now.Add(10 * time.Second).Unix()}}, }, retry: map[string][]base.Z{ "default": {}, @@ -1528,8 +1510,8 @@ func TestRetry(t *testing.T) { "default": {t1, t2}, "custom": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(20 * time.Second).Unix()}}, "custom": {}, }, wantRetry: map[string][]base.Z{ @@ -1544,7 +1526,7 @@ func TestRetry(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllActiveQueues(t, r.client, tc.active) - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) h.SeedAllRetryQueues(t, r.client, tc.retry) err := r.Retry(tc.msg, tc.processAt, tc.errMsg, true /*isFailure*/) @@ -1559,10 +1541,10 @@ func TestRetry(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff) } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.LeaseKey(queue), diff) } } for queue, want := range tc.wantRetry { @@ -1640,28 +1622,25 @@ func TestRetryWithNonFailureError(t *testing.T) { Timeout: 1800, Queue: "custom", } - t1Deadline := now.Unix() + t1.Timeout - t2Deadline := now.Unix() + t2.Timeout - t4Deadline := now.Unix() + t4.Timeout errMsg := "SMTP server is not responding" tests := []struct { - active map[string][]*base.TaskMessage - deadlines map[string][]base.Z - retry map[string][]base.Z - msg *base.TaskMessage - processAt time.Time - errMsg string - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantRetry map[string][]base.Z + active map[string][]*base.TaskMessage + lease map[string][]base.Z + retry map[string][]base.Z + msg *base.TaskMessage + processAt time.Time + errMsg string + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z + wantRetry map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, retry: map[string][]base.Z{ "default": {{Message: t3, Score: now.Add(time.Minute).Unix()}}, @@ -1672,8 +1651,8 @@ func TestRetryWithNonFailureError(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, wantRetry: map[string][]base.Z{ "default": { @@ -1688,9 +1667,9 @@ func TestRetryWithNonFailureError(t *testing.T) { "default": {t1, t2}, "custom": {t4}, }, - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, - "custom": {{Message: t4, Score: t4Deadline}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}}, + "custom": {{Message: t4, Score: now.Add(10 * time.Second).Unix()}}, }, retry: map[string][]base.Z{ "default": {}, @@ -1703,8 +1682,8 @@ func TestRetryWithNonFailureError(t *testing.T) { "default": {t1, t2}, "custom": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}}, "custom": {}, }, wantRetry: map[string][]base.Z{ @@ -1720,7 +1699,7 @@ func TestRetryWithNonFailureError(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllActiveQueues(t, r.client, tc.active) - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) h.SeedAllRetryQueues(t, r.client, tc.retry) err := r.Retry(tc.msg, tc.processAt, tc.errMsg, false /*isFailure*/) @@ -1735,10 +1714,10 @@ func TestRetryWithNonFailureError(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff) } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.LeaseKey(queue), diff) } } for queue, want := range tc.wantRetry { @@ -1790,7 +1769,6 @@ func TestArchive(t *testing.T) { Retried: 25, Timeout: 1800, } - t1Deadline := now.Unix() + t1.Timeout t2 := &base.TaskMessage{ ID: uuid.NewString(), Type: "reindex", @@ -1800,7 +1778,6 @@ func TestArchive(t *testing.T) { Retried: 0, Timeout: 3000, } - t2Deadline := now.Unix() + t2.Timeout t3 := &base.TaskMessage{ ID: uuid.NewString(), Type: "generate_csv", @@ -1810,7 +1787,6 @@ func TestArchive(t *testing.T) { Retried: 0, Timeout: 60, } - t3Deadline := now.Unix() + t3.Timeout t4 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -1820,27 +1796,26 @@ func TestArchive(t *testing.T) { Retried: 25, Timeout: 1800, } - t4Deadline := now.Unix() + t4.Timeout errMsg := "SMTP server not responding" // TODO(hibiken): add test cases for trimming tests := []struct { - active map[string][]*base.TaskMessage - deadlines map[string][]base.Z - archived map[string][]base.Z - target *base.TaskMessage // task to archive - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantArchived map[string][]base.Z + active map[string][]*base.TaskMessage + lease map[string][]base.Z + archived map[string][]base.Z + target *base.TaskMessage // task to archive + wantActive map[string][]*base.TaskMessage + wantLease map[string][]base.Z + wantArchived map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, }, }, archived: map[string][]base.Z{ @@ -1852,8 +1827,8 @@ func TestArchive(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t2, Score: t2Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}}, }, wantArchived: map[string][]base.Z{ "default": { @@ -1866,11 +1841,11 @@ func TestArchive(t *testing.T) { active: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, - {Message: t3, Score: t3Deadline}, + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, + {Message: t3, Score: now.Add(10 * time.Second).Unix()}, }, }, archived: map[string][]base.Z{ @@ -1880,10 +1855,10 @@ func TestArchive(t *testing.T) { wantActive: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, - wantDeadlines: map[string][]base.Z{ + wantLease: map[string][]base.Z{ "default": { - {Message: t2, Score: t2Deadline}, - {Message: t3, Score: t3Deadline}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, + {Message: t3, Score: now.Add(10 * time.Second).Unix()}, }, }, wantArchived: map[string][]base.Z{ @@ -1897,12 +1872,12 @@ func TestArchive(t *testing.T) { "default": {t1}, "custom": {t4}, }, - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: t1Deadline}, + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, }, "custom": { - {Message: t4, Score: t4Deadline}, + {Message: t4, Score: now.Add(10 * time.Second).Unix()}, }, }, archived: map[string][]base.Z{ @@ -1914,8 +1889,8 @@ func TestArchive(t *testing.T) { "default": {t1}, "custom": {}, }, - wantDeadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: t1Deadline}}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, "custom": {}, }, wantArchived: map[string][]base.Z{ @@ -1930,7 +1905,7 @@ func TestArchive(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllActiveQueues(t, r.client, tc.active) - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) h.SeedAllArchivedQueues(t, r.client, tc.archived) err := r.Archive(tc.target, errMsg) @@ -1945,10 +1920,10 @@ func TestArchive(t *testing.T) { t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.ActiveKey(queue), diff) } } - for queue, want := range tc.wantDeadlines { - gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) + for queue, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.LeaseKey(queue), diff) } } for queue, want := range tc.wantArchived { @@ -2306,7 +2281,7 @@ func TestListDeadlineExceeded(t *testing.T) { defer r.Close() for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.deadlines) got, err := r.ListDeadlineExceeded(tc.t, tc.qnames...) if err != nil {