2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.RemoveQueue

This commit is contained in:
Ken Hibino 2021-02-28 06:48:21 -08:00
parent de226f3654
commit f434f62369
2 changed files with 43 additions and 9 deletions

View File

@ -884,11 +884,27 @@ func (e *ErrQueueNotEmpty) Error() string {
// KEYS[4] -> asynq:{<qname>}:retry // KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived // KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines // KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueForceCmd = redis.NewScript(` var removeQueueForceCmd = redis.NewScript(`
local active = redis.call("LLEN", KEYS[2]) local active = redis.call("LLEN", KEYS[2])
if active > 0 then if active > 0 then
return redis.error_reply("Queue has tasks active") return redis.error_reply("Queue has tasks active")
end end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3]) redis.call("DEL", KEYS[3])
@ -898,22 +914,36 @@ redis.call("DEL", KEYS[6])
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Checks whether queue is empty before removing. // Checks whether queue is empty before removing.
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:active // KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled // KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry // KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived // KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines // KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueCmd = redis.NewScript(` var removeQueueCmd = redis.NewScript(`
local pending = redis.call("LLEN", KEYS[1]) local ids = {}
local active = redis.call("LLEN", KEYS[2]) for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
local scheduled = redis.call("SCARD", KEYS[3]) table.insert(ids, id)
local retry = redis.call("SCARD", KEYS[4]) end
local archived = redis.call("SCARD", KEYS[5]) for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
local total = pending + active + scheduled + retry + archived table.insert(ids, id)
if total > 0 then end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
table.insert(ids, id)
end
if table.getn(ids) > 0 then
return redis.error_reply("QUEUE NOT EMPTY") return redis.error_reply("QUEUE NOT EMPTY")
end end
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3]) redis.call("DEL", KEYS[3])
@ -950,7 +980,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
base.ArchivedKey(qname), base.ArchivedKey(qname),
base.DeadlinesKey(qname), base.DeadlinesKey(qname),
} }
if err := script.Run(r.client, keys).Err(); err != nil { if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil {
if err.Error() == "QUEUE NOT EMPTY" { if err.Error() == "QUEUE NOT EMPTY" {
return &ErrQueueNotEmpty{qname} return &ErrQueueNotEmpty{qname}
} }

View File

@ -3060,6 +3060,10 @@ func TestRemoveQueue(t *testing.T) {
t.Errorf("key %q still exists", key) t.Errorf("key %q still exists", key)
} }
} }
if n := len(r.client.Keys(base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 {
t.Errorf("%d keys still exists for tasks", n)
}
} }
} }