diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index b21d09e..9a130a7 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -944,54 +944,113 @@ func (r *RDB) DeleteTask(qname string, id uuid.UUID) error { } } -// KEYS[1] -> queue to delete -// ARGV[1] -> task key prefix -var deleteAllCmd = redis.NewScript(` -local ids = redis.call("ZRANGE", KEYS[1], 0, -1) -for _, id in ipairs(ids) do - local key = ARGV[1] .. id - redis.call("DEL", key) -end -redis.call("DEL", KEYS[1]) -return table.getn(ids)`) - // DeleteAllArchivedTasks deletes all archived tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) { - return r.deleteAll(base.ArchivedKey(qname), qname) + var op errors.Op = "rdb.DeleteAllArchivedTasks" + n, err := r.deleteAll(base.ArchivedKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } // DeleteAllRetryTasks deletes all retry tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) { - return r.deleteAll(base.RetryKey(qname), qname) + var op errors.Op = "rdb.DeleteAllRetryTasks" + n, err := r.deleteAll(base.RetryKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } // DeleteAllScheduledTasks deletes all scheduled tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { - return r.deleteAll(base.ScheduledKey(qname), qname) + var op errors.Op = "rdb.DeleteAllScheduledTasks" + n, err := r.deleteAll(base.ScheduledKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } +// deleteAllCmd deletes tasks from the given zset. +// +// Input: +// KEYS[1] -> zset holding the task ids. +// KEYS[2] -> all queues key +// -- +// ARGV[1] -> task key prefix +// ARGV[2] -> queue name +// +// Output: +// integer: number of tasks deleted +// Returns -1 if queue doesn't exist +var deleteAllCmd = redis.NewScript(` +if redis.call("SISMEMBER", KEYS[2], ARGV[2]) == 0 then + return -1 +end +local ids = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, id in ipairs(ids) do + redis.call("DEL", ARGV[1] .. id) +end +redis.call("DEL", KEYS[1]) +return table.getn(ids)`) + func (r *RDB) deleteAll(key, qname string) (int64, error) { - res, err := deleteAllCmd.Run(r.client, []string{key}, base.TaskKeyPrefix(qname)).Result() + keys := []string{ + key, + base.AllQueues, + } + argv := []interface{}{ + base.TaskKeyPrefix(qname), + qname, + } + res, err := deleteAllCmd.Run(r.client, keys, argv...).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return 0, fmt.Errorf("unexpected return value from Lua script: %v", res) + } + if n == -1 { + return 0, &errors.QueueNotFoundError{Queue: qname} } return n, nil } +// deleteAllPendingCmd deletes all pending tasks from the given queue. +// +// Input: // KEYS[1] -> asynq:{}:pending +// KEYS[2] -> all queues key +// -- // ARGV[1] -> task key prefix +// ARGV[2] -> queue name +// +// Output: +// integer: number of tasks deleted +// Returns -1 if queue doesn't exist var deleteAllPendingCmd = redis.NewScript(` +if redis.call("SISMEMBER", KEYS[2], ARGV[2]) == 0 then + return -1 +end local ids = redis.call("LRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do - local key = ARGV[1] .. id - redis.call("DEL", key) + redis.call("DEL", ARGV[1] .. id) end redis.call("DEL", KEYS[1]) return table.getn(ids)`) @@ -999,14 +1058,25 @@ return table.getn(ids)`) // DeleteAllPendingTasks deletes all pending tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { - res, err := deleteAllPendingCmd.Run(r.client, - []string{base.PendingKey(qname)}, base.TaskKeyPrefix(qname)).Result() + var op errors.Op = "rdb.DeleteAllPendingTasks" + keys := []string{ + base.PendingKey(qname), + base.AllQueues, + } + argv := []interface{}{ + base.TaskKeyPrefix(qname), + qname, + } + res, err := deleteAllPendingCmd.Run(r.client, keys, argv...).Result() if err != nil { - return 0, err + return 0, errors.E(op, errors.Unknown, err) } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("command error: unexpected return value %v", res) + return 0, errors.E(op, errors.Internal, "command error: unexpected return value %v", res) + } + if n == -1 { + return 0, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b93b1ac..fcb6f46 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3313,6 +3313,38 @@ func TestDeleteAllPendingTasks(t *testing.T) { } } +func TestDeleteAllTasksError(t *testing.T) { + r := setup(t) + defer r.Close() + + tests := []struct { + desc string + qname string + match func(err error) bool + }{ + { + desc: "It returns QueueNotFoundError if queue doesn't exist", + qname: "nonexistent", + match: errors.IsQueueNotFound, + }, + } + + for _, tc := range tests { + if _, got := r.DeleteAllPendingTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: DeleteAllPendingTasks returned %v", tc.desc, got) + } + if _, got := r.DeleteAllScheduledTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: DeleteAllScheduledTasks returned %v", tc.desc, got) + } + if _, got := r.DeleteAllRetryTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: DeleteAllRetryTasks returned %v", tc.desc, got) + } + if _, got := r.DeleteAllArchivedTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: DeleteAllArchivedTasks returned %v", tc.desc, got) + } + } +} + func TestRemoveQueue(t *testing.T) { r := setup(t) defer r.Close()