2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-06-06 23:02:57 +08:00

Correct the error message to cancel an active tasks

This commit is contained in:
Erwan Leboucher 2022-04-13 10:37:18 +02:00 committed by Ken Hibino
parent dd6f84c575
commit 5c723f597e

View File

@ -109,7 +109,7 @@ table.insert(res, KEYS[6])
table.insert(res, redis.call("ZCARD", KEYS[6])) table.insert(res, redis.call("ZCARD", KEYS[6]))
for i=7,10 do for i=7,10 do
local count = 0 local count = 0
local n = redis.call("GET", KEYS[i]) local n = redis.call("GET", KEYS[i])
if n then if n then
count = tonumber(n) count = tonumber(n)
end end
@ -568,7 +568,7 @@ var groupStatsCmd = redis.NewScript(`
local res = {} local res = {}
local group_names = redis.call("SMEMBERS", KEYS[1]) local group_names = redis.call("SMEMBERS", KEYS[1])
for _, gname in ipairs(group_names) do for _, gname in ipairs(group_names) do
local size = redis.call("ZCARD", ARGV[1] .. gname) local size = redis.call("ZCARD", ARGV[1] .. gname)
table.insert(res, gname) table.insert(res, gname)
table.insert(res, size) table.insert(res, size)
end end
@ -1295,7 +1295,7 @@ elseif state == "aggregating" then
if redis.call("ZCARD", ARGV[6] .. group) == 0 then if redis.call("ZCARD", ARGV[6] .. group) == 0 then
redis.call("SREM", KEYS[3], group) redis.call("SREM", KEYS[3], group)
end end
else else
if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then
return redis.error_reply("task id not found in zset " .. tostring(ARGV[5] .. state)) return redis.error_reply("task id not found in zset " .. tostring(ARGV[5] .. state))
end end
@ -1349,7 +1349,7 @@ func (r *RDB) ArchiveTask(qname, id string) error {
case -1: case -1:
return errors.E(op, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: qname, ID: id}) return errors.E(op, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: qname, ID: id})
case -2: case -2:
return errors.E(op, errors.FailedPrecondition, "cannot archive task in active state. use CancelTask instead.") return errors.E(op, errors.FailedPrecondition, "cannot archive task in active state. use CancelProcessing instead.")
case -3: case -3:
return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
default: default:
@ -1490,7 +1490,7 @@ func (r *RDB) DeleteTask(qname, id string) error {
case 0: case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id}) return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id})
case -1: case -1:
return errors.E(op, errors.FailedPrecondition, "cannot delete task in active state. use CancelTask instead.") return errors.E(op, errors.FailedPrecondition, "cannot delete task in active state. use CancelProcessing instead.")
default: default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", n)) return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", n))
} }