From 043e61b06e5bc2de359c607192b95cb69f70557a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 27 Feb 2021 15:12:10 -0800 Subject: [PATCH] Update RDB.ArchiveAllPending --- internal/rdb/inspect.go | 23 +++--- internal/rdb/inspect_test.go | 132 +++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 10 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 0abc2fb..e6fc62f 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -617,35 +617,38 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) } -// KEYS[1] -> asynq:{} +// KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:archived // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) var archiveAllPendingCmd = redis.NewScript(` -local msgs = redis.call("LRANGE", KEYS[1], 0, -1) -for _, msg in ipairs(msgs) do - redis.call("ZADD", KEYS[2], ARGV[1], msg) +local ids = redis.call("LRANGE", KEYS[1], 0, -1) +for _, id in ipairs(ids) do + redis.call("ZADD", KEYS[2], ARGV[1], id) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) end redis.call("DEL", KEYS[1]) -return table.getn(msgs)`) +return table.getn(ids)`) // ArchiveAllPendingTasks archives all pending tasks from the given queue and -// returns the number of tasks that were moved. +// returns the number of tasks moved. func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} now := time.Now() - limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago - args := []interface{}{now.Unix(), limit, maxArchiveSize} - res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result() + argv := []interface{}{ + now.Unix(), + now.AddDate(0, 0, -archivedExpirationInDays).Unix(), + maxArchiveSize, + } + res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return 0, fmt.Errorf("command error: unexpected return value %v", res) } return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index e951c3b..f8a0667 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1966,6 +1966,138 @@ func TestArchivePendingTask(t *testing.T) { } } } +func TestArchiveAllPendingTasks(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + t1 := time.Now().Add(1 * time.Minute) + t2 := time.Now().Add(1 * time.Hour) + + tests := []struct { + pending map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + want int64 + wantPending map[string][]*base.TaskMessage + wantArchived map[string][]base.Z + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + archived: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 2, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + }, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + qname: "default", + want: 1, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {}, + }, + archived: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + want: 0, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3, m4}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + want: 2, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: m3, Score: time.Now().Unix()}, + {Message: m4, Score: time.Now().Unix()}, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllPendingQueues(t, r.client, tc.pending) + h.SeedAllArchivedQueues(t, r.client, tc.archived) + + got, err := r.ArchiveAllPendingTasks(tc.qname) + if got != tc.want || err != nil { + t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil", + tc.qname, got, err, tc.want) + continue + } + + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.PendingKey(qname), diff) + } + } + + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ArchivedKey(qname), diff) + } + } + } +} func TestArchiveAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close()