From 74f08795f8df2ad1318c5836e8c9a60d3136f5ef Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 8 Aug 2020 12:44:08 -0700 Subject: [PATCH] Update Kill method in RDB --- internal/rdb/rdb.go | 16 ++-- internal/rdb/rdb_test.go | 185 ++++++++++++++++++++++++++------------- 2 files changed, 130 insertions(+), 71 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d443a64..423f1ff 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -364,11 +364,11 @@ const ( deadExpirationInDays = 90 ) -// KEYS[1] -> asynq:in_progress -// KEYS[2] -> asynq:deadlines -// KEYS[3] -> asynq:dead -// KEYS[4] -> asynq:processed: -// KEYS[5] -> asynq.failure: +// KEYS[1] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:dead +// KEYS[4] -> asynq:{}:processed: +// KEYS[5] -> asynq:{}:failure: // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue // ARGV[2] -> base.TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp @@ -411,11 +411,11 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { } now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - processedKey := base.ProcessedKey(now) - failureKey := base.FailureKey(now) + processedKey := base.ProcessedKey(msg.Queue, now) + failureKey := base.FailureKey(msg.Queue, now) expireAt := now.Add(statsTTL) return killCmd.Run(r.client, - []string{base.InProgressQueue, base.KeyDeadlines, base.DeadQueue, processedKey, failureKey}, + []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failureKey}, msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 4faaa7f..709adca 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -919,9 +919,9 @@ func TestRetry(t *testing.T) { } failedKey := base.FailedKey(tc.msg.Queue, time.Now()) - gotFailure := r.client.Get(failedKey).Val() - if gotFailure != "1" { - t.Errorf("GET %q = %q, want 1", failedKey, gotFailure) + gotFailed := r.client.Get(failedKey).Val() + if gotFailed != "1" { + t.Errorf("GET %q = %q, want 1", failedKey, gotFailed) } gotTTL = r.client.TTL(failedKey).Val() if gotTTL > statsTTL { @@ -939,7 +939,7 @@ func TestKill(t *testing.T) { Payload: nil, Queue: "default", Retry: 25, - Retried: 0, + Retried: 25, Timeout: 1800, } t1Deadline := now.Unix() + t1.Timeout @@ -963,64 +963,117 @@ func TestKill(t *testing.T) { Timeout: 60, } t3Deadline := now.Unix() + t3.Timeout + t4 := &base.TaskMessage{ + ID: uuid.New(), + Type: "send_email", + Payload: nil, + Queue: "custom", + Retry: 25, + Retried: 25, + Timeout: 1800, + } + t4Deadline := now.Unix() + t4.Timeout errMsg := "SMTP server not responding" // TODO(hibiken): add test cases for trimming tests := []struct { - inProgress []*base.TaskMessage - deadlines []base.Z - dead []base.Z + inProgress map[string][]*base.TaskMessage + deadlines map[string][]base.Z + dead map[string][]base.Z target *base.TaskMessage // task to kill - wantInProgress []*base.TaskMessage - wantDeadlines []base.Z - wantDead []base.Z + wantInProgress map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantDead map[string][]base.Z }{ { - inProgress: []*base.TaskMessage{t1, t2}, - deadlines: []base.Z{ - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, + inProgress: map[string][]*base.TaskMessage{ + "default": {t1, t2}, }, - dead: []base.Z{ - { - Message: t3, - Score: now.Add(-time.Hour).Unix(), + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: t1Deadline}, + {Message: t2, Score: t2Deadline}, }, }, - target: t1, - wantInProgress: []*base.TaskMessage{t2}, - wantDeadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, - }, - wantDead: []base.Z{ - { - Message: h.TaskMessageWithError(*t1, errMsg), - Score: now.Unix(), + dead: map[string][]base.Z{ + "default": { + {Message: t3, Score: now.Add(-time.Hour).Unix()}, }, - { - Message: t3, - Score: now.Add(-time.Hour).Unix(), + }, + target: t1, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + }, + wantDeadlines: map[string]base.Z{ + "default": {{Message: t2, Score: t2Deadline}}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, + {Message: t3, Score: now.Add(-time.Hour).Unix()}, }, }, }, { - inProgress: []*base.TaskMessage{t1, t2, t3}, - deadlines: []base.Z{ - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, - {Message: t3, Score: t3Deadline}, + inProgress: map[string][]*base.TaskMessage{ + "default": {t1, t2, t3}, }, - dead: []base.Z{}, - target: t1, - wantInProgress: []*base.TaskMessage{t2, t3}, - wantDeadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, - {Message: t3, Score: t3Deadline}, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: t1Deadline}, + {Message: t2, Score: t2Deadline}, + {Message: t3, Score: t3Deadline}, + }, }, - wantDead: []base.Z{ - { - Message: h.TaskMessageWithError(*t1, errMsg), - Score: now.Unix(), + dead: map[string][]base.Z{ + "default": {}, + }, + target: t1, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2, t3}, + }, + wantDeadlines: map[string][]base.Z{ + "default": { + {Message: t2, Score: t2Deadline}, + {Message: t3, Score: t3Deadline}, + }, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, + }, + }, + }, + { + inProgress: map[string][]*base.TaskMessage{ + "default": {t1}, + "custom": {t4}, + }, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: t1Deadline}, + }, + "custom": { + {Message: t4, Score: t4Deadline}, + }, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + target: t4, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t1}, + "custom": {}, + }, + wantDeadlines: map[string]base.Z{ + "default": {{Message: t1, Score: t1Deadline}}, + "custom": {}, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: h.TaskMessageWithError(*t4, errMsg), Score: now.Unix()}, }, }, }, @@ -1028,9 +1081,9 @@ func TestKill(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedDeadlines(t, r.client, tc.deadlines) - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllDeadQueues(t, r.client, tc.dead) err := r.Kill(tc.target, errMsg) if err != nil { @@ -1038,20 +1091,26 @@ func TestKill(t *testing.T) { continue } - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressQueue, diff) + for queue, want := range tc.wantInProgress { + gotInProgress := h.GetInProgressMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressKey(queue), diff) + } } - gotDeadlines := h.GetDeadlinesEntries(t, r.client) - if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.KeyDeadlines, diff) + for queue, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) + } } - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) + for queue, want := range tc.wantDead { + gotDead := h.GetDeadEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff) + } } - processedKey := base.ProcessedKey(time.Now()) + processedKey := base.ProcessedKey(tc.target.Queue, time.Now()) gotProcessed := r.client.Get(processedKey).Val() if gotProcessed != "1" { t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) @@ -1061,14 +1120,14 @@ func TestKill(t *testing.T) { t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) } - failureKey := base.FailureKey(time.Now()) - gotFailure := r.client.Get(failureKey).Val() - if gotFailure != "1" { - t.Errorf("GET %q = %q, want 1", failureKey, gotFailure) + failedKey := base.FailedKey(tc.target.Queue, time.Now()) + gotFailed := r.client.Get(failedKey).Val() + if gotFailed != "1" { + t.Errorf("GET %q = %q, want 1", failedKey, gotFailed) } gotTTL = r.client.TTL(processedKey).Val() if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL) + t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL) } } }