diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 242e550..de19fb5 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -884,11 +884,27 @@ func (e *ErrQueueNotEmpty) Error() string { // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines +// ARGV[1] -> task key prefix var removeQueueForceCmd = redis.NewScript(` local active = redis.call("LLEN", KEYS[2]) if active > 0 then return redis.error_reply("Queue has tasks active") end +for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[3]) @@ -898,22 +914,36 @@ redis.call("DEL", KEYS[6]) return redis.status_reply("OK")`) // Checks whether queue is empty before removing. -// KEYS[1] -> asynq:{} +// KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines +// ARGV[1] -> task key prefix var removeQueueCmd = redis.NewScript(` -local pending = redis.call("LLEN", KEYS[1]) -local active = redis.call("LLEN", KEYS[2]) -local scheduled = redis.call("SCARD", KEYS[3]) -local retry = redis.call("SCARD", KEYS[4]) -local archived = redis.call("SCARD", KEYS[5]) -local total = pending + active + scheduled + retry + archived -if total > 0 then +local ids = {} +for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do + table.insert(ids, id) +end +for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do + table.insert(ids, id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do + table.insert(ids, id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do + table.insert(ids, id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do + table.insert(ids, id) +end +if table.getn(ids) > 0 then return redis.error_reply("QUEUE NOT EMPTY") end +for _, id in ipairs(ids) do + redis.call("DEL", ARGV[1] .. id) +end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[3]) @@ -950,7 +980,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { base.ArchivedKey(qname), base.DeadlinesKey(qname), } - if err := script.Run(r.client, keys).Err(); err != nil { + if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil { if err.Error() == "QUEUE NOT EMPTY" { return &ErrQueueNotEmpty{qname} } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 16a9b2b..bc2c806 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3060,6 +3060,10 @@ func TestRemoveQueue(t *testing.T) { t.Errorf("key %q still exists", key) } } + + if n := len(r.client.Keys(base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 { + t.Errorf("%d keys still exists for tasks", n) + } } }