From cde3e57c6ccfb2e2b104c03575f3658e773fc141 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 7 May 2021 16:31:07 -0700 Subject: [PATCH] Update RDB.RunAll* methods with task state --- internal/rdb/inspect.go | 71 ++++++++++++++++++++++++++++++++---- internal/rdb/inspect_test.go | 29 +++++++++++++++ 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 05ab6b0..b21d09e 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -442,20 +442,47 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro // RunAllScheduledTasks enqueues all scheduled tasks from the given queue // and returns the number of tasks enqueued. +// If a queue with the given name doesn't exist, it returns QueueNotFoundError. func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.ScheduledKey(qname), base.PendingKey(qname)) + var op errors.Op = "rdb.RunAllScheduledTasks" + n, err := r.runAll(base.ScheduledKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } // RunAllRetryTasks enqueues all retry tasks from the given queue // and returns the number of tasks enqueued. +// If a queue with the given name doesn't exist, it returns QueueNotFoundError. func (r *RDB) RunAllRetryTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.RetryKey(qname), base.PendingKey(qname)) + var op errors.Op = "rdb.RunAllRetryTasks" + n, err := r.runAll(base.RetryKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } // RunAllArchivedTasks enqueues all archived tasks from the given queue // and returns the number of tasks enqueued. +// If a queue with the given name doesn't exist, it returns QueueNotFoundError. func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname)) + var op errors.Op = "rdb.RunAllArchivedTasks" + n, err := r.runAll(base.ArchivedKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil } // runTaskCmd is a Lua script that updates the given task to pending state. @@ -541,16 +568,43 @@ func (r *RDB) RunTask(qname string, id uuid.UUID) error { } } -var removeAndRunAllCmd = redis.NewScript(` +// runAllCmd is a Lua script that moves all tasks in the given state +// (one of: scheduled, retry, archived) to pending state. +// +// Input: +// KEYS[1] -> zset which holds task ids (e.g. asynq:{}:scheduled) +// KEYS[2] -> asynq:{}:pending +// KEYS[3] -> all queues key +// -- +// ARGV[1] -> task key prefix +// ARGV[2] -> queue name +// +// Output: +// integer: number of tasks updated to pending state. +// Returns -1 if queue doesn't exist +var runAllCmd = redis.NewScript(` +if redis.call("SISMEMBER", KEYS[3], ARGV[2]) == 0 then + return -1 +end local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("LPUSH", KEYS[2], id) - redis.call("ZREM", KEYS[1], id) + redis.call("HSET", ARGV[1] .. id, "state", "pending") end +redis.call("DEL", KEYS[1]) return table.getn(ids)`) -func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { - res, err := removeAndRunAllCmd.Run(r.client, []string{zset, qkey}).Result() +func (r *RDB) runAll(zset, qname string) (int64, error) { + keys := []string{ + zset, + base.PendingKey(qname), + base.AllQueues, + } + argv := []interface{}{ + base.TaskKeyPrefix(qname), + qname, + } + res, err := runAllCmd.Run(r.client, keys, argv...).Result() if err != nil { return 0, err } @@ -558,6 +612,9 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { if !ok { return 0, fmt.Errorf("could not cast %v to int64", res) } + if n == -1 { + return 0, &errors.QueueNotFoundError{Queue: qname} + } return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b03cb89..b93b1ac 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1711,6 +1711,35 @@ func TestRunAllArchivedTasks(t *testing.T) { } } +func TestRunAllTasksError(t *testing.T) { + r := setup(t) + defer r.Close() + + tests := []struct { + desc string + qname string + match func(err error) bool + }{ + { + desc: "It returns QueueNotFoundError if queue doesn't exist", + qname: "nonexistent", + match: errors.IsQueueNotFound, + }, + } + + for _, tc := range tests { + if _, got := r.RunAllScheduledTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: RunAllScheduledTasks returned %v", tc.desc, got) + } + if _, got := r.RunAllRetryTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: RunAllRetryTasks returned %v", tc.desc, got) + } + if _, got := r.RunAllArchivedTasks(tc.qname); !tc.match(got) { + t.Errorf("%s: RunAllArchivedTasks returned %v", tc.desc, got) + } + } +} + func TestArchiveRetryTask(t *testing.T) { r := setup(t) defer r.Close()