diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 77cf262..242e550 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -793,31 +793,36 @@ func (r *RDB) deleteTask(key, qname, id string) error { } // KEYS[1] -> queue to delete +// ARGV[1] -> task key prefix var deleteAllCmd = redis.NewScript(` -local n = redis.call("ZCARD", KEYS[1]) +local ids = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, id in ipairs(ids) do + local key = ARGV[1] .. id + redis.call("DEL", key) +end redis.call("DEL", KEYS[1]) -return n`) +return table.getn(ids)`) // DeleteAllArchivedTasks deletes all archived tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) { - return r.deleteAll(base.ArchivedKey(qname)) + return r.deleteAll(base.ArchivedKey(qname), qname) } // DeleteAllRetryTasks deletes all retry tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) { - return r.deleteAll(base.RetryKey(qname)) + return r.deleteAll(base.RetryKey(qname), qname) } // DeleteAllScheduledTasks deletes all scheduled tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { - return r.deleteAll(base.ScheduledKey(qname)) + return r.deleteAll(base.ScheduledKey(qname), qname) } -func (r *RDB) deleteAll(key string) (int64, error) { - res, err := deleteAllCmd.Run(r.client, []string{key}).Result() +func (r *RDB) deleteAll(key, qname string) (int64, error) { + res, err := deleteAllCmd.Run(r.client, []string{key}, base.TaskKeyPrefix(qname)).Result() if err != nil { return 0, err } @@ -828,22 +833,28 @@ func (r *RDB) deleteAll(key string) (int64, error) { return n, nil } -// KEYS[1] -> asynq:{} +// KEYS[1] -> asynq:{}:pending +// ARGV[1] -> task key prefix var deleteAllPendingCmd = redis.NewScript(` -local n = redis.call("LLEN", KEYS[1]) +local ids = redis.call("LRANGE", KEYS[1], 0, -1) +for _, id in ipairs(ids) do + local key = ARGV[1] .. id + redis.call("DEL", key) +end redis.call("DEL", KEYS[1]) -return n`) +return table.getn(ids)`) // DeleteAllPendingTasks deletes all pending tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { - res, err := deleteAllPendingCmd.Run(r.client, []string{base.PendingKey(qname)}).Result() + res, err := deleteAllPendingCmd.Run(r.client, + []string{base.PendingKey(qname)}, base.TaskKeyPrefix(qname)).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return 0, fmt.Errorf("command error: unexpected return value %v", res) } return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 3e9596e..16a9b2b 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2905,6 +2905,63 @@ func TestDeleteAllScheduledTasks(t *testing.T) { } } +func TestDeleteAllPendingTasks(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + + tests := []struct { + pending map[string][]*base.TaskMessage + qname string + want int64 + wantPending map[string][]*base.TaskMessage + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + qname: "default", + want: 2, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m3}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "custom": {}, + }, + qname: "custom", + want: 0, + wantPending: map[string][]*base.TaskMessage{ + "custom": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllPendingQueues(t, r.client, tc.pending) + + got, err := r.DeleteAllPendingTasks(tc.qname) + if err != nil { + t.Errorf("r.DeleteAllPendingTasks(%q) returned error: %v", tc.qname, err) + } + if got != tc.want { + t.Errorf("r.DeleteAllPendingTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) + } + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) + } + } + } +} + func TestRemoveQueue(t *testing.T) { r := setup(t) defer r.Close()