2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Free uniqueness lock when task is deleted

This commit is contained in:
Ken Hibino
2021-06-06 06:35:36 -07:00
parent 3e9fc2f972
commit ebe482a65c
2 changed files with 169 additions and 5 deletions

View File

@@ -735,6 +735,11 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
if n == 0 {
return ErrTaskNotFound
}
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
return err
}
}
return nil
}
}
@@ -747,6 +752,9 @@ for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
redis.call("DEL", decoded["UniqueKey"])
end
return 1
end
end
@@ -769,9 +777,15 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
// KEYS[1] -> queue to delete
var deleteAllCmd = redis.NewScript(`
local n = redis.call("ZCARD", KEYS[1])
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
end
redis.call("DEL", KEYS[1])
return n`)
return table.getn(msgs)`)
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted.
@@ -805,9 +819,15 @@ func (r *RDB) deleteAll(key string) (int64, error) {
// KEYS[1] -> asynq:{<qname>}
var deleteAllPendingCmd = redis.NewScript(`
local n = redis.call("LLEN", KEYS[1])
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
end
redis.call("DEL", KEYS[1])
return n`)
return table.getn(msgs)`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.