From 8d3d30da8f41db8eb3b2206842cecd3d98b3c8d4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 27 Dec 2019 06:22:33 -0800 Subject: [PATCH] Add KillAllRetryTasks and KillAllScheduledTasks method to RDB --- internal/rdb/inspect.go | 42 ++++++++ internal/rdb/inspect_test.go | 192 +++++++++++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index dbb78da..3cb9071 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -414,6 +414,18 @@ func (r *RDB) KillScheduledTask(id xid.ID, score int64) error { return nil } +// KillAllRetryTasks moves all tasks from retry queue to dead queue and +// returns the number of tasks that were moved. +func (r *RDB) KillAllRetryTasks() (int64, error) { + return r.removeAndKillAll(base.RetryQueue) +} + +// KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and +// returns the number of tasks that were moved. +func (r *RDB) KillAllScheduledTasks() (int64, error) { + return r.removeAndKillAll(base.ScheduledQueue) +} + func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { // KEYS[1] -> ZSET to move task from (e.g., retry queue) // KEYS[2] -> asynq:dead @@ -451,6 +463,36 @@ func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { return n, nil } +func (r *RDB) removeAndKillAll(zset string) (int64, error) { + // KEYS[1] -> ZSET to move task from (e.g., retry queue) + // KEYS[2] -> asynq:dead + // ARGV[1] -> current timestamp + // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) + // ARGV[3] -> max number of tasks in dead queue (e.g., 100) + script := redis.NewScript(` + local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) + for _, msg in ipairs(msgs) do + redis.call("ZREM", KEYS[1], msg) + redis.call("ZADD", KEYS[2], ARGV[1], msg) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) + end + return table.getn(msgs) + `) + now := time.Now() + limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + res, err := script.Run(r.client, []string{zset, base.DeadQueue}, + now.Unix(), limit, maxDeadTasks).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil +} + // DeleteDeadTask finds a task that matches the given id and score from dead queue // and deletes it. If a task that matches the id and score does not exist, // it returns ErrTaskNotFound. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 19bdbc4..88ee5db 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1014,6 +1014,198 @@ func TestKillScheduledTask(t *testing.T) { } } +func TestKillAllRetryTasks(t *testing.T) { + r := setup(t) + m1 := newTaskMessage("send_email", nil) + m2 := newTaskMessage("reindex", nil) + t1 := time.Now().Add(time.Minute) + t2 := time.Now().Add(time.Hour) + + tests := []struct { + retry []sortedSetEntry + dead []sortedSetEntry + want int64 + wantRetry []sortedSetEntry + wantDead []sortedSetEntry + }{ + { + retry: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + dead: []sortedSetEntry{}, + want: 2, + wantRetry: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, time.Now().Unix()}, + }, + }, + { + retry: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + dead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + want: 1, + wantRetry: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, t2.Unix()}, + }, + }, + { + retry: []sortedSetEntry{}, + dead: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + want: 0, + wantRetry: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedRetryQueue(t, r, tc.retry) + seedDeadQueue(t, r, tc.dead) + + got, err := r.KillAllRetryTasks() + if got != tc.want || err != nil { + t.Errorf("(*RDB).KillAllRetryTasks() = %v, %v; want %v, nil", + got, err, tc.want) + continue + } + + gotRetryRaw := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Val() + var gotRetry []sortedSetEntry + for _, z := range gotRetryRaw { + gotRetry = append(gotRetry, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.RetryQueue, diff) + } + + gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() + var gotDead []sortedSetEntry + for _, z := range gotDeadRaw { + gotDead = append(gotDead, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadQueue, diff) + } + } +} + +func TestKillAllScheduledTasks(t *testing.T) { + r := setup(t) + m1 := newTaskMessage("send_email", nil) + m2 := newTaskMessage("reindex", nil) + t1 := time.Now().Add(time.Minute) + t2 := time.Now().Add(time.Hour) + + tests := []struct { + scheduled []sortedSetEntry + dead []sortedSetEntry + want int64 + wantScheduled []sortedSetEntry + wantDead []sortedSetEntry + }{ + { + scheduled: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + dead: []sortedSetEntry{}, + want: 2, + wantScheduled: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, time.Now().Unix()}, + }, + }, + { + scheduled: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + dead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + want: 1, + wantScheduled: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, t2.Unix()}, + }, + }, + { + scheduled: []sortedSetEntry{}, + dead: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + want: 0, + wantScheduled: []sortedSetEntry{}, + wantDead: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedScheduledQueue(t, r, tc.scheduled) + seedDeadQueue(t, r, tc.dead) + + got, err := r.KillAllScheduledTasks() + if got != tc.want || err != nil { + t.Errorf("(*RDB).KillAllScheduledTasks() = %v, %v; want %v, nil", + got, err, tc.want) + continue + } + + gotScheduledRaw := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val() + var gotScheduled []sortedSetEntry + for _, z := range gotScheduledRaw { + gotScheduled = append(gotScheduled, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ScheduledQueue, diff) + } + + gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() + var gotDead []sortedSetEntry + for _, z := range gotDeadRaw { + gotDead = append(gotDead, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadQueue, diff) + } + } +} + func TestDeleteDeadTask(t *testing.T) { r := setup(t) m1 := newTaskMessage("send_email", nil)