From 420bd2c748d506817bdf71ac88bc55bf3f3cf110 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 26 Feb 2021 21:40:55 -0800 Subject: [PATCH] Update RDB.ArchivePendingTask --- internal/rdb/inspect.go | 63 +++++++------------ internal/rdb/inspect_test.go | 117 +++++++++++++++++++++++++++++++++-- 2 files changed, 135 insertions(+), 45 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 8d53ffd..4a96327 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -562,13 +562,12 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error { // KEYS[1] -> asynq:{} // KEYS[2] -> asynq:{}:archived -// ARGV[1] -> task message to archive +// ARGV[1] -> ID of the task to archive // ARGV[2] -> current timestamp // ARGV[3] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> max number of tasks in archive (e.g., 100) var archivePendingCmd = redis.NewScript(` -local x = redis.call("LREM", KEYS[1], 1, ARGV[1]) -if x == 0 then +if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then return 0 end redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) @@ -577,47 +576,33 @@ redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4]) return 1 `) -func (r *RDB) archivePending(qname, msg string) (int64, error) { - keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} - now := time.Now() - limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago - args := []interface{}{msg, now.Unix(), limit, maxArchiveSize} - res, err := archivePendingCmd.Run(r.client, keys, args...).Result() - if err != nil { - return 0, err - } - n, ok := res.(int64) - if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) - } - return n, nil -} - -// ArchivePendingTask finds a pending task that matches the given id from the given queue -// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound. +// ArchivePendingTask finds a pending task that matches the given id +// from the given queue and archives it. +// If there's no match, it returns ErrTaskNotFound. func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error { - qkey := base.PendingKey(qname) - data, err := r.client.LRange(qkey, 0, -1).Result() + keys := []string{ + base.PendingKey(qname), + base.ArchivedKey(qname), + } + now := time.Now() + argv := []interface{}{ + id.String(), + now.Unix(), + now.AddDate(0, 0, -archivedExpirationInDays).Unix(), + maxArchiveSize, + } + res, err := archivePendingCmd.Run(r.client, keys, argv...).Result() if err != nil { return err } - for _, s := range data { - msg, err := base.DecodeMessage(s) - if err != nil { - return err - } - if msg.ID == id { - n, err := r.archivePending(qname, s) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil - } + n, ok := res.(int64) + if !ok { + return fmt.Errorf("command error: unexpected return value %v", res) } - return ErrTaskNotFound + if n == 0 { + return ErrTaskNotFound + } + return nil } // ArchiveAllRetryTasks archives all retry tasks from the given queue and diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 7b7160d..e951c3b 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1716,7 +1716,7 @@ func TestArchiveRetryTask(t *testing.T) { got := r.ArchiveRetryTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("(*RDB).KillRetryTask(%q, %v) = %v, want %v", + t.Errorf("(*RDB).ArchiveRetryTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1838,7 +1838,7 @@ func TestArchiveScheduledTask(t *testing.T) { got := r.ArchiveScheduledTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("(*RDB).KillScheduledTask(%q, %v) = %v, want %v", + t.Errorf("(*RDB).ArchiveScheduledTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1861,7 +1861,112 @@ func TestArchiveScheduledTask(t *testing.T) { } } -func TestKillAllRetryTasks(t *testing.T) { +func TestArchivePendingTask(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + + oneHourAgo := time.Now().Add(-1 * time.Hour) + + tests := []struct { + pending map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + id uuid.UUID + want error + wantPending map[string][]*base.TaskMessage + wantArchived map[string][]base.Z + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + archived: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + id: m1.ID, + want: nil, + wantPending: map[string][]*base.TaskMessage{ + "default": {m2}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m1, Score: time.Now().Unix()}}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: oneHourAgo.Unix()}}, + }, + qname: "default", + id: m2.ID, + want: ErrTaskNotFound, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m2, Score: oneHourAgo.Unix()}}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3, m4}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + id: m3.ID, + want: nil, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m4}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: time.Now().Unix()}}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllPendingQueues(t, r.client, tc.pending) + h.SeedAllArchivedQueues(t, r.client, tc.archived) + + got := r.ArchivePendingTask(tc.qname, tc.id) + if got != tc.want { + t.Errorf("(*RDB).ArchivePendingTask(%q, %v) = %v, want %v", + tc.qname, tc.id, got, tc.want) + continue + } + + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.PendingKey(qname), diff) + } + } + + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ArchivedKey(qname), diff) + } + } + } +} +func TestArchiveAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2008,7 +2113,7 @@ func TestKillAllRetryTasks(t *testing.T) { } } -func TestKillAllScheduledTasks(t *testing.T) { +func TestArchiveAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2155,7 +2260,7 @@ func TestKillAllScheduledTasks(t *testing.T) { } } -func TestDeleteDeadTask(t *testing.T) { +func TestDeleteArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2425,7 +2530,7 @@ func TestDeleteScheduledTask(t *testing.T) { } } -func TestDeleteAllDeadTasks(t *testing.T) { +func TestDeleteAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil)