From 76bd865ebcd02b349d4487cfcea7769d40ed5fd8 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 9 May 2021 06:48:44 -0700 Subject: [PATCH] Update RDB.RemoveQueue with specific error types --- internal/errors/errors.go | 15 +++++++++ internal/rdb/inspect.go | 60 ++++++++++++++++++++++++++++-------- internal/rdb/inspect_test.go | 8 +++-- 3 files changed, 68 insertions(+), 15 deletions(-) diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 170408e..7e6da13 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -182,6 +182,21 @@ func IsQueueNotFound(err error) bool { return As(err, &target) } +// QueueNotEmptyError indicates that the given queue is not empty. +type QueueNotEmptyError struct { + Queue string // queue name +} + +func (e *QueueNotEmptyError) Error() string { + return fmt.Sprintf("queue %q is not empty", e.Queue) +} + +// IsQueueNotEmpty reports whether any error in err's chain is of type QueueNotEmptyError. +func IsQueueNotEmpty(err error) bool { + var target *QueueNotEmptyError + return As(err, &target) +} + // TaskAlreadyArchivedError indicates that the task in question is already archived. type TaskAlreadyArchivedError struct { Queue string // queue name diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 94f4f42..94d2d2b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1128,18 +1128,28 @@ func (e *QueueNotEmptyError) Error() string { return fmt.Sprintf("queue %q is not empty", e.Name) } -// Only check whether active queue is empty before removing. +// removeQueueForceCmd removes the given queue regardless of +// whether the queue is empty. +// It only check whether active queue is empty before removing. +// +// Input: // KEYS[1] -> asynq:{} // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines +// -- // ARGV[1] -> task key prefix +// +// Output: +// Numeric code to indicate the status. +// Returns 1 if successfully removed. +// Returns -2 if the queue has active tasks. var removeQueueForceCmd = redis.NewScript(` local active = redis.call("LLEN", KEYS[2]) if active > 0 then - return redis.error_reply("Queue has tasks active") + return -2 end for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do redis.call("DEL", ARGV[1] .. id) @@ -1162,16 +1172,25 @@ redis.call("DEL", KEYS[3]) redis.call("DEL", KEYS[4]) redis.call("DEL", KEYS[5]) redis.call("DEL", KEYS[6]) -return redis.status_reply("OK")`) +return 1`) -// Checks whether queue is empty before removing. +// removeQueueCmd removes the given queue. +// It checks whether queue is empty before removing. +// +// Input: // KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines +// -- // ARGV[1] -> task key prefix +// +// Output: +// Numeric code to indicate the status +// Returns 1 if successfully removed. +// Returns -1 if queue is not empty var removeQueueCmd = redis.NewScript(` local ids = {} for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do @@ -1190,7 +1209,7 @@ for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do table.insert(ids, id) end if table.getn(ids) > 0 then - return redis.error_reply("QUEUE NOT EMPTY") + return -1 end for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) @@ -1201,7 +1220,7 @@ redis.call("DEL", KEYS[3]) redis.call("DEL", KEYS[4]) redis.call("DEL", KEYS[5]) redis.call("DEL", KEYS[6]) -return redis.status_reply("OK")`) +return 1`) // RemoveQueue removes the specified queue. // @@ -1210,12 +1229,13 @@ return redis.status_reply("OK")`) // If force is set to false, it will only remove the queue if // the queue is empty. func (r *RDB) RemoveQueue(qname string, force bool) error { + var op errors.Op = "rdb.RemoveQueue" exists, err := r.client.SIsMember(base.AllQueues, qname).Result() if err != nil { return err } if !exists { - return &QueueNotFoundError{qname} + return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } var script *redis.Script if force { @@ -1231,13 +1251,27 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { base.ArchivedKey(qname), base.DeadlinesKey(qname), } - if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil { - if err.Error() == "QUEUE NOT EMPTY" { - return &QueueNotEmptyError{qname} - } - return err + res, err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Result() + if err != nil { + return errors.E(op, errors.Unknown, err) + } + n, ok := res.(int64) + if !ok { + return errors.E(op, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", res)) + } + switch n { + case 1: + if err := r.client.SRem(base.AllQueues, qname).Err(); err != nil { + return errors.E(op, errors.Unknown, err) + } + return nil + case -1: + return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname}) + case -2: + return errors.E(op, errors.FailedPrecondition, "cannot remove queue with active tasks") + default: + return errors.E(op, errors.Unknown, fmt.Sprintf("unexpected return value from Lua script: %d", n)) } - return r.client.SRem(base.AllQueues, qname).Err() } // Note: Script also removes stale keys. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index c711153..53bc029 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3503,6 +3503,7 @@ func TestRemoveQueueError(t *testing.T) { archived map[string][]base.Z qname string // queue to remove force bool + match func(err error) bool }{ { desc: "removing non-existent queue", @@ -3528,6 +3529,7 @@ func TestRemoveQueueError(t *testing.T) { }, qname: "nonexistent", force: false, + match: errors.IsQueueNotFound, }, { desc: "removing non-empty queue", @@ -3553,6 +3555,7 @@ func TestRemoveQueueError(t *testing.T) { }, qname: "custom", force: false, + match: errors.IsQueueNotEmpty, }, { desc: "force removing queue with active tasks", @@ -3579,6 +3582,7 @@ func TestRemoveQueueError(t *testing.T) { qname: "custom", // Even with force=true, it should error if there are active tasks. force: true, + match: func(err error) bool { return errors.CanonicalCode(err) == errors.FailedPrecondition }, }, } @@ -3591,8 +3595,8 @@ func TestRemoveQueueError(t *testing.T) { h.SeedAllArchivedQueues(t, r.client, tc.archived) got := r.RemoveQueue(tc.qname, tc.force) - if got == nil { - t.Errorf("%s;(*RDB).RemoveQueue(%q) = nil, want error", tc.desc, tc.qname) + if !tc.match(got) { + t.Errorf("%s; returned error didn't match expected value; got=%v", tc.desc, got) continue }