2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Update RDB methods to use lease instead of deadlines set

This commit is contained in:
Ken Hibino 2022-02-10 19:01:05 -08:00
parent bca624792c
commit b8cb579407
3 changed files with 195 additions and 203 deletions

View File

@ -230,6 +230,13 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive)
}
// SeedLease initializes the lease set with the given entries.
func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
r.SAdd(context.Background(), base.AllQueues, qname)
seedRedisZSet(tb, r, base.LeaseKey(qname), entries, base.TaskStateActive)
}
// SeedCompletedQueue initializes the completed set witht the given entries.
func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
@ -287,6 +294,14 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
}
}
// SeedAllLease initializes all of the lease sets with the given entries.
func SeedAllLease(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) {
tb.Helper()
for q, entries := range deadlines {
SeedLease(tb, r, entries, q)
}
}
// SeedAllCompletedQueues initializes all of the completed queues with the given entries.
func SeedAllCompletedQueues(tb testing.TB, r redis.UniversalClient, completed map[string][]base.Z) {
tb.Helper()

View File

@ -275,10 +275,11 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) {
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[2] -> asynq:{<qname>}:lease
// KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:processed
// -------
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
@ -306,11 +307,12 @@ return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[2] -> asynq:{<qname>}:lease
// KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:processed
// KEYS[6] -> unique key
// -------
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
@ -349,7 +351,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
expireAt := now.Add(statsTTL)
keys := []string{
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.LeaseKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID),
base.ProcessedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
@ -368,7 +370,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[2] -> asynq:{<qname>}:lease
// KEYS[3] -> asynq:{<qname>}:completed
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
@ -404,7 +406,7 @@ return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[2] -> asynq:{<qname>}:lease
// KEYS[3] -> asynq:{<qname>}:completed
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
@ -457,7 +459,7 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error {
}
keys := []string{
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.LeaseKey(msg.Queue),
base.CompletedKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID),
base.ProcessedKey(msg.Queue, now),
@ -479,7 +481,7 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error {
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[2] -> asynq:{<qname>}:lease
// KEYS[3] -> asynq:{<qname>}:pending
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID
@ -501,7 +503,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
ctx := context.Background()
keys := []string{
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.LeaseKey(msg.Queue),
base.PendingKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID),
}
@ -634,13 +636,13 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:lease
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed
//
// -------
// ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> retry_at UNIX timestamp
@ -697,7 +699,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.LeaseKey(msg.Queue),
base.RetryKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now),
@ -722,13 +724,13 @@ const (
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:lease
// KEYS[4] -> asynq:{<qname>}:archived
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed
//
// -------
// ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> died_at UNIX timestamp
@ -783,7 +785,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
keys := []string{
base.TaskKey(msg.Queue, msg.ID),
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.LeaseKey(msg.Queue),
base.ArchivedKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now),

View File

@ -681,17 +681,14 @@ func TestDone(t *testing.T) {
UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72",
Queue: "default",
}
t1Deadline := now.Unix() + t1.Timeout
t2Deadline := t2.Deadline
t3Deadline := now.Unix() + t3.Timeout
tests := []struct {
desc string
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
lease map[string][]base.Z // initial state of the lease set
target *base.TaskMessage // task to remove
wantActive map[string][]*base.TaskMessage // final state of the active list
wantDeadlines map[string][]base.Z // final state of the deadline set
wantLease map[string][]base.Z // final state of the lease set
}{
{
desc: "removes message from the correct queue",
@ -699,18 +696,18 @@ func TestDone(t *testing.T) {
"default": {t1},
"custom": {t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
target: t1,
wantActive: map[string][]*base.TaskMessage{
"default": {},
"custom": {t2},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {},
"custom": {{Message: t2, Score: t2Deadline}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
},
{
@ -718,14 +715,14 @@ func TestDone(t *testing.T) {
active: map[string][]*base.TaskMessage{
"default": {t1},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
},
target: t1,
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {},
},
},
@ -735,25 +732,25 @@ func TestDone(t *testing.T) {
"default": {t1, t3},
"custom": {t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(15 * time.Second).Unix()}, {Message: t3, Score: now.Add(10 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
target: t3,
wantActive: map[string][]*base.TaskMessage{
"default": {t1},
"custom": {t2},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(15 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllActiveQueues(t, r.client, tc.active)
for _, msgs := range tc.active {
for _, msg := range msgs {
@ -780,10 +777,10 @@ func TestDone(t *testing.T) {
continue
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff)
for queue, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.LeaseKey(queue), diff)
continue
}
}
@ -825,9 +822,9 @@ func TestDoneWithMaxCounter(t *testing.T) {
z := base.Z{
Message: msg,
Score: time.Now().Add(5 * time.Minute).Unix(),
Score: time.Now().Add(15 * time.Second).Unix(),
}
h.SeedDeadlines(t, r.client, []base.Z{z}, msg.Queue)
h.SeedLease(t, r.client, []base.Z{z}, msg.Queue)
h.SeedActiveQueue(t, r.client, []*base.TaskMessage{msg}, msg.Queue)
processedTotalKey := base.ProcessedTotalKey(msg.Queue)
@ -850,6 +847,7 @@ func TestMarkAsComplete(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "send_email",
@ -878,19 +876,16 @@ func TestMarkAsComplete(t *testing.T) {
Queue: "default",
Retention: 1800,
}
t1Deadline := now.Unix() + t1.Timeout
t2Deadline := t2.Deadline
t3Deadline := now.Unix() + t3.Timeout
tests := []struct {
desc string
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
lease map[string][]base.Z // initial state of the lease set
completed map[string][]base.Z // initial state of the completed set
target *base.TaskMessage // task to mark as completed
wantActive map[string][]*base.TaskMessage // final state of the active list
wantDeadlines map[string][]base.Z // final state of the deadline set
wantCompleted func(completedAt time.Time) map[string][]base.Z // final state of the completed set
wantLease map[string][]base.Z // final state of the lease set
wantCompleted map[string][]base.Z // final state of the completed set
}{
{
desc: "select a message from the correct queue",
@ -898,9 +893,9 @@ func TestMarkAsComplete(t *testing.T) {
"default": {t1},
"custom": {t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(30 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
completed: map[string][]base.Z{
"default": {},
@ -911,15 +906,13 @@ func TestMarkAsComplete(t *testing.T) {
"default": {},
"custom": {t2},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {},
"custom": {{Message: t2, Score: t2Deadline}},
"custom": {{Message: t2, Score: now.Add(20 * time.Second).Unix()}},
},
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
return map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.Retention}},
wantCompleted: map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, now), Score: now.Unix() + t1.Retention}},
"custom": {},
}
},
},
{
@ -927,8 +920,8 @@ func TestMarkAsComplete(t *testing.T) {
active: map[string][]*base.TaskMessage{
"default": {t1},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
},
completed: map[string][]base.Z{
"default": {},
@ -937,13 +930,11 @@ func TestMarkAsComplete(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {},
},
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
return map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.Retention}},
}
wantCompleted: map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, now), Score: now.Unix() + t1.Retention}},
},
},
{
@ -952,9 +943,9 @@ func TestMarkAsComplete(t *testing.T) {
"default": {t1, t3},
"custom": {t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t3, Score: now.Add(12 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(12 * time.Second).Unix()}},
},
completed: map[string][]base.Z{
"default": {},
@ -965,22 +956,20 @@ func TestMarkAsComplete(t *testing.T) {
"default": {t1},
"custom": {t2},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
"custom": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
"custom": {{Message: t2, Score: now.Add(12 * time.Second).Unix()}},
},
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
return map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t3, completedAt), Score: completedAt.Unix() + t3.Retention}},
wantCompleted: map[string][]base.Z{
"default": {{Message: h.TaskMessageWithCompletedAt(*t3, now), Score: now.Unix() + t3.Retention}},
"custom": {},
}
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
for _, msgs := range tc.active {
@ -995,7 +984,6 @@ func TestMarkAsComplete(t *testing.T) {
}
}
completedAt := time.Now()
err := r.MarkAsComplete(tc.target)
if err != nil {
t.Errorf("%s; (*RDB).MarkAsCompleted(task) = %v, want nil", tc.desc, err)
@ -1009,14 +997,14 @@ func TestMarkAsComplete(t *testing.T) {
continue
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff)
for queue, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.LeaseKey(queue), diff)
continue
}
}
for queue, want := range tc.wantCompleted(completedAt) {
for queue, want := range tc.wantCompleted {
gotCompleted := h.GetCompletedEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotCompleted, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.CompletedKey(queue), diff)
@ -1066,18 +1054,15 @@ func TestRequeue(t *testing.T) {
Queue: "critical",
Timeout: 80,
}
t1Deadline := now.Unix() + t1.Timeout
t2Deadline := now.Unix() + t2.Timeout
t3Deadline := now.Unix() + t3.Timeout
tests := []struct {
pending map[string][]*base.TaskMessage // initial state of queues
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
lease map[string][]base.Z // initial state of the lease set
target *base.TaskMessage // task to requeue
wantPending map[string][]*base.TaskMessage // final state of queues
wantActive map[string][]*base.TaskMessage // final state of the active list
wantDeadlines map[string][]base.Z // final state of the deadlines set
wantLease map[string][]base.Z // final state of the lease set
}{
{
pending: map[string][]*base.TaskMessage{
@ -1086,10 +1071,10 @@ func TestRequeue(t *testing.T) {
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
},
},
target: t1,
@ -1099,9 +1084,9 @@ func TestRequeue(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {
{Message: t2, Score: t2Deadline},
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
},
},
},
@ -1112,9 +1097,9 @@ func TestRequeue(t *testing.T) {
active: map[string][]*base.TaskMessage{
"default": {t2},
},
deadlines: map[string][]base.Z{
lease: map[string][]base.Z{
"default": {
{Message: t2, Score: t2Deadline},
{Message: t2, Score: now.Add(20 * time.Second).Unix()},
},
},
target: t2,
@ -1124,7 +1109,7 @@ func TestRequeue(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {},
},
},
@ -1137,9 +1122,9 @@ func TestRequeue(t *testing.T) {
"default": {t2},
"critical": {t3},
},
deadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}},
"critical": {{Message: t3, Score: t3Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}},
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
},
target: t3,
wantPending: map[string][]*base.TaskMessage{
@ -1150,8 +1135,8 @@ func TestRequeue(t *testing.T) {
"default": {t2},
"critical": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}},
"critical": {},
},
},
@ -1161,7 +1146,7 @@ func TestRequeue(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
err := r.Requeue(tc.target)
if err != nil {
@ -1181,10 +1166,10 @@ func TestRequeue(t *testing.T) {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.ActiveKey(qname), diff)
}
}
for qname, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DeadlinesKey(qname), diff)
for qname, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.LeaseKey(qname), diff)
}
}
}
@ -1466,28 +1451,25 @@ func TestRetry(t *testing.T) {
Timeout: 1800,
Queue: "custom",
}
t1Deadline := now.Unix() + t1.Timeout
t2Deadline := now.Unix() + t2.Timeout
t4Deadline := now.Unix() + t4.Timeout
errMsg := "SMTP server is not responding"
tests := []struct {
active map[string][]*base.TaskMessage
deadlines map[string][]base.Z
lease map[string][]base.Z
retry map[string][]base.Z
msg *base.TaskMessage
processAt time.Time
errMsg string
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantLease map[string][]base.Z
wantRetry map[string][]base.Z
}{
{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
},
retry: map[string][]base.Z{
"default": {{Message: t3, Score: now.Add(time.Minute).Unix()}},
@ -1498,8 +1480,8 @@ func TestRetry(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}},
},
wantRetry: map[string][]base.Z{
"default": {
@ -1513,9 +1495,9 @@ func TestRetry(t *testing.T) {
"default": {t1, t2},
"custom": {t4},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
"custom": {{Message: t4, Score: t4Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(20 * time.Second).Unix()}},
"custom": {{Message: t4, Score: now.Add(10 * time.Second).Unix()}},
},
retry: map[string][]base.Z{
"default": {},
@ -1528,8 +1510,8 @@ func TestRetry(t *testing.T) {
"default": {t1, t2},
"custom": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(20 * time.Second).Unix()}},
"custom": {},
},
wantRetry: map[string][]base.Z{
@ -1544,7 +1526,7 @@ func TestRetry(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllRetryQueues(t, r.client, tc.retry)
err := r.Retry(tc.msg, tc.processAt, tc.errMsg, true /*isFailure*/)
@ -1559,10 +1541,10 @@ func TestRetry(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
for queue, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.LeaseKey(queue), diff)
}
}
for queue, want := range tc.wantRetry {
@ -1640,28 +1622,25 @@ func TestRetryWithNonFailureError(t *testing.T) {
Timeout: 1800,
Queue: "custom",
}
t1Deadline := now.Unix() + t1.Timeout
t2Deadline := now.Unix() + t2.Timeout
t4Deadline := now.Unix() + t4.Timeout
errMsg := "SMTP server is not responding"
tests := []struct {
active map[string][]*base.TaskMessage
deadlines map[string][]base.Z
lease map[string][]base.Z
retry map[string][]base.Z
msg *base.TaskMessage
processAt time.Time
errMsg string
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantLease map[string][]base.Z
wantRetry map[string][]base.Z
}{
{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
},
retry: map[string][]base.Z{
"default": {{Message: t3, Score: now.Add(time.Minute).Unix()}},
@ -1672,8 +1651,8 @@ func TestRetryWithNonFailureError(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}},
},
wantRetry: map[string][]base.Z{
"default": {
@ -1688,9 +1667,9 @@ func TestRetryWithNonFailureError(t *testing.T) {
"default": {t1, t2},
"custom": {t4},
},
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
"custom": {{Message: t4, Score: t4Deadline}},
lease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
"custom": {{Message: t4, Score: now.Add(10 * time.Second).Unix()}},
},
retry: map[string][]base.Z{
"default": {},
@ -1703,8 +1682,8 @@ func TestRetryWithNonFailureError(t *testing.T) {
"default": {t1, t2},
"custom": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
"custom": {},
},
wantRetry: map[string][]base.Z{
@ -1720,7 +1699,7 @@ func TestRetryWithNonFailureError(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllRetryQueues(t, r.client, tc.retry)
err := r.Retry(tc.msg, tc.processAt, tc.errMsg, false /*isFailure*/)
@ -1735,10 +1714,10 @@ func TestRetryWithNonFailureError(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
for queue, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.LeaseKey(queue), diff)
}
}
for queue, want := range tc.wantRetry {
@ -1790,7 +1769,6 @@ func TestArchive(t *testing.T) {
Retried: 25,
Timeout: 1800,
}
t1Deadline := now.Unix() + t1.Timeout
t2 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "reindex",
@ -1800,7 +1778,6 @@ func TestArchive(t *testing.T) {
Retried: 0,
Timeout: 3000,
}
t2Deadline := now.Unix() + t2.Timeout
t3 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "generate_csv",
@ -1810,7 +1787,6 @@ func TestArchive(t *testing.T) {
Retried: 0,
Timeout: 60,
}
t3Deadline := now.Unix() + t3.Timeout
t4 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "send_email",
@ -1820,27 +1796,26 @@ func TestArchive(t *testing.T) {
Retried: 25,
Timeout: 1800,
}
t4Deadline := now.Unix() + t4.Timeout
errMsg := "SMTP server not responding"
// TODO(hibiken): add test cases for trimming
tests := []struct {
active map[string][]*base.TaskMessage
deadlines map[string][]base.Z
lease map[string][]base.Z
archived map[string][]base.Z
target *base.TaskMessage // task to archive
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantLease map[string][]base.Z
wantArchived map[string][]base.Z
}{
{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
@ -1852,8 +1827,8 @@ func TestArchive(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t2, Score: now.Add(10 * time.Second).Unix()}},
},
wantArchived: map[string][]base.Z{
"default": {
@ -1866,11 +1841,11 @@ func TestArchive(t *testing.T) {
active: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
deadlines: map[string][]base.Z{
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
{Message: t3, Score: t3Deadline},
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
{Message: t3, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
@ -1880,10 +1855,10 @@ func TestArchive(t *testing.T) {
wantActive: map[string][]*base.TaskMessage{
"default": {t2, t3},
},
wantDeadlines: map[string][]base.Z{
wantLease: map[string][]base.Z{
"default": {
{Message: t2, Score: t2Deadline},
{Message: t3, Score: t3Deadline},
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
{Message: t3, Score: now.Add(10 * time.Second).Unix()},
},
},
wantArchived: map[string][]base.Z{
@ -1897,12 +1872,12 @@ func TestArchive(t *testing.T) {
"default": {t1},
"custom": {t4},
},
deadlines: map[string][]base.Z{
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: t1Deadline},
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
"custom": {
{Message: t4, Score: t4Deadline},
{Message: t4, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
@ -1914,8 +1889,8 @@ func TestArchive(t *testing.T) {
"default": {t1},
"custom": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t1, Score: t1Deadline}},
wantLease: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
"custom": {},
},
wantArchived: map[string][]base.Z{
@ -1930,7 +1905,7 @@ func TestArchive(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
err := r.Archive(tc.target, errMsg)
@ -1945,10 +1920,10 @@ func TestArchive(t *testing.T) {
t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
for queue, want := range tc.wantLease {
gotLease := h.GetLeaseEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.LeaseKey(queue), diff)
}
}
for queue, want := range tc.wantArchived {
@ -2306,7 +2281,7 @@ func TestListDeadlineExceeded(t *testing.T) {
defer r.Close()
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllLease(t, r.client, tc.deadlines)
got, err := r.ListDeadlineExceeded(tc.t, tc.qnames...)
if err != nil {