2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update RDB.RemoveQueue with specific error types

This commit is contained in:
Ken Hibino 2021-05-09 06:48:44 -07:00
parent 136d1c9ea9
commit 76bd865ebc
3 changed files with 68 additions and 15 deletions

View File

@ -182,6 +182,21 @@ func IsQueueNotFound(err error) bool {
return As(err, &target) return As(err, &target)
} }
// QueueNotEmptyError indicates that the given queue is not empty.
type QueueNotEmptyError struct {
Queue string // queue name
}
func (e *QueueNotEmptyError) Error() string {
return fmt.Sprintf("queue %q is not empty", e.Queue)
}
// IsQueueNotEmpty reports whether any error in err's chain is of type QueueNotEmptyError.
func IsQueueNotEmpty(err error) bool {
var target *QueueNotEmptyError
return As(err, &target)
}
// TaskAlreadyArchivedError indicates that the task in question is already archived. // TaskAlreadyArchivedError indicates that the task in question is already archived.
type TaskAlreadyArchivedError struct { type TaskAlreadyArchivedError struct {
Queue string // queue name Queue string // queue name

View File

@ -1128,18 +1128,28 @@ func (e *QueueNotEmptyError) Error() string {
return fmt.Sprintf("queue %q is not empty", e.Name) return fmt.Sprintf("queue %q is not empty", e.Name)
} }
// Only check whether active queue is empty before removing. // removeQueueForceCmd removes the given queue regardless of
// whether the queue is empty.
// It only check whether active queue is empty before removing.
//
// Input:
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled // KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry // KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived // KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines // KEYS[6] -> asynq:{<qname>}:deadlines
// --
// ARGV[1] -> task key prefix // ARGV[1] -> task key prefix
//
// Output:
// Numeric code to indicate the status.
// Returns 1 if successfully removed.
// Returns -2 if the queue has active tasks.
var removeQueueForceCmd = redis.NewScript(` var removeQueueForceCmd = redis.NewScript(`
local active = redis.call("LLEN", KEYS[2]) local active = redis.call("LLEN", KEYS[2])
if active > 0 then if active > 0 then
return redis.error_reply("Queue has tasks active") return -2
end end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
redis.call("DEL", ARGV[1] .. id) redis.call("DEL", ARGV[1] .. id)
@ -1162,16 +1172,25 @@ redis.call("DEL", KEYS[3])
redis.call("DEL", KEYS[4]) redis.call("DEL", KEYS[4])
redis.call("DEL", KEYS[5]) redis.call("DEL", KEYS[5])
redis.call("DEL", KEYS[6]) redis.call("DEL", KEYS[6])
return redis.status_reply("OK")`) return 1`)
// Checks whether queue is empty before removing. // removeQueueCmd removes the given queue.
// It checks whether queue is empty before removing.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:pending // KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled // KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry // KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived // KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines // KEYS[6] -> asynq:{<qname>}:deadlines
// --
// ARGV[1] -> task key prefix // ARGV[1] -> task key prefix
//
// Output:
// Numeric code to indicate the status
// Returns 1 if successfully removed.
// Returns -1 if queue is not empty
var removeQueueCmd = redis.NewScript(` var removeQueueCmd = redis.NewScript(`
local ids = {} local ids = {}
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
@ -1190,7 +1209,7 @@ for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
table.insert(ids, id) table.insert(ids, id)
end end
if table.getn(ids) > 0 then if table.getn(ids) > 0 then
return redis.error_reply("QUEUE NOT EMPTY") return -1
end end
for _, id in ipairs(ids) do for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id) redis.call("DEL", ARGV[1] .. id)
@ -1201,7 +1220,7 @@ redis.call("DEL", KEYS[3])
redis.call("DEL", KEYS[4]) redis.call("DEL", KEYS[4])
redis.call("DEL", KEYS[5]) redis.call("DEL", KEYS[5])
redis.call("DEL", KEYS[6]) redis.call("DEL", KEYS[6])
return redis.status_reply("OK")`) return 1`)
// RemoveQueue removes the specified queue. // RemoveQueue removes the specified queue.
// //
@ -1210,12 +1229,13 @@ return redis.status_reply("OK")`)
// If force is set to false, it will only remove the queue if // If force is set to false, it will only remove the queue if
// the queue is empty. // the queue is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error { func (r *RDB) RemoveQueue(qname string, force bool) error {
var op errors.Op = "rdb.RemoveQueue"
exists, err := r.client.SIsMember(base.AllQueues, qname).Result() exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
if err != nil { if err != nil {
return err return err
} }
if !exists { if !exists {
return &QueueNotFoundError{qname} return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
var script *redis.Script var script *redis.Script
if force { if force {
@ -1231,13 +1251,27 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
base.ArchivedKey(qname), base.ArchivedKey(qname),
base.DeadlinesKey(qname), base.DeadlinesKey(qname),
} }
if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil { res, err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Result()
if err.Error() == "QUEUE NOT EMPTY" { if err != nil {
return &QueueNotEmptyError{qname} return errors.E(op, errors.Unknown, err)
} }
return err n, ok := res.(int64)
if !ok {
return errors.E(op, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", res))
}
switch n {
case 1:
if err := r.client.SRem(base.AllQueues, qname).Err(); err != nil {
return errors.E(op, errors.Unknown, err)
}
return nil
case -1:
return errors.E(op, errors.NotFound, &errors.QueueNotEmptyError{Queue: qname})
case -2:
return errors.E(op, errors.FailedPrecondition, "cannot remove queue with active tasks")
default:
return errors.E(op, errors.Unknown, fmt.Sprintf("unexpected return value from Lua script: %d", n))
} }
return r.client.SRem(base.AllQueues, qname).Err()
} }
// Note: Script also removes stale keys. // Note: Script also removes stale keys.

View File

@ -3503,6 +3503,7 @@ func TestRemoveQueueError(t *testing.T) {
archived map[string][]base.Z archived map[string][]base.Z
qname string // queue to remove qname string // queue to remove
force bool force bool
match func(err error) bool
}{ }{
{ {
desc: "removing non-existent queue", desc: "removing non-existent queue",
@ -3528,6 +3529,7 @@ func TestRemoveQueueError(t *testing.T) {
}, },
qname: "nonexistent", qname: "nonexistent",
force: false, force: false,
match: errors.IsQueueNotFound,
}, },
{ {
desc: "removing non-empty queue", desc: "removing non-empty queue",
@ -3553,6 +3555,7 @@ func TestRemoveQueueError(t *testing.T) {
}, },
qname: "custom", qname: "custom",
force: false, force: false,
match: errors.IsQueueNotEmpty,
}, },
{ {
desc: "force removing queue with active tasks", desc: "force removing queue with active tasks",
@ -3579,6 +3582,7 @@ func TestRemoveQueueError(t *testing.T) {
qname: "custom", qname: "custom",
// Even with force=true, it should error if there are active tasks. // Even with force=true, it should error if there are active tasks.
force: true, force: true,
match: func(err error) bool { return errors.CanonicalCode(err) == errors.FailedPrecondition },
}, },
} }
@ -3591,8 +3595,8 @@ func TestRemoveQueueError(t *testing.T) {
h.SeedAllArchivedQueues(t, r.client, tc.archived) h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.RemoveQueue(tc.qname, tc.force) got := r.RemoveQueue(tc.qname, tc.force)
if got == nil { if !tc.match(got) {
t.Errorf("%s;(*RDB).RemoveQueue(%q) = nil, want error", tc.desc, tc.qname) t.Errorf("%s; returned error didn't match expected value; got=%v", tc.desc, got)
continue continue
} }