From 68839dc9d361ee805299a53bada7a48b92a65c00 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 22 May 2021 17:46:23 -0700 Subject: [PATCH] Fix lua scripts for redis cluster --- internal/rdb/inspect.go | 112 ++++++++++++---------------------------- 1 file changed, 32 insertions(+), 80 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 2100b39..c84bd4d 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -615,32 +615,26 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { // Input: // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:pending -// KEYS[3] -> all queues key // -- // ARGV[1] -> task ID // ARGV[2] -> queue key prefix; asynq:{}: -// ARGV[3] -> queue name // // Output: // Numeric code indicating the status: // Returns 1 if task is successfully updated. // Returns 0 if task is not found. -// Returns -1 if queue doesn't exist. -// Returns -2 if task is in active state. -// Returns -3 if task is in pending state. +// Returns -1 if task is in active state. +// Returns -2 if task is in pending state. // Returns error reply if unexpected error occurs. var runTaskCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[3], ARGV[3]) == 0 then - return -1 -end if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end local state = redis.call("HGET", KEYS[1], "state") if state == "active" then - return -2 + return -1 elseif state == "pending" then - return -3 + return -2 end local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1]) if n == 0 then @@ -659,15 +653,16 @@ return 1 // If a task is in active or pending state it returns non-nil error with Code FailedPrecondition. func (r *RDB) RunTask(qname string, id uuid.UUID) error { var op errors.Op = "rdb.RunTask" + if err := r.checkQueueExists(qname); err != nil { + return errors.E(op, errors.CanonicalCode(err), err) + } keys := []string{ base.TaskKey(qname, id.String()), base.PendingKey(qname), - base.AllQueues, } argv := []interface{}{ id.String(), base.QueueKeyPrefix(qname), - qname, } res, err := runTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -683,10 +678,8 @@ func (r *RDB) RunTask(qname string, id uuid.UUID) error { case 0: return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) case -1: - return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) - case -2: return errors.E(op, errors.FailedPrecondition, "task is already running") - case -3: + case -2: return errors.E(op, errors.FailedPrecondition, "task is already in pending state") default: return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script %d", n)) @@ -699,18 +692,12 @@ func (r *RDB) RunTask(qname string, id uuid.UUID) error { // Input: // KEYS[1] -> zset which holds task ids (e.g. asynq:{}:scheduled) // KEYS[2] -> asynq:{}:pending -// KEYS[3] -> all queues key // -- // ARGV[1] -> task key prefix -// ARGV[2] -> queue name // // Output: // integer: number of tasks updated to pending state. -// Returns -1 if queue doesn't exist var runAllCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[3], ARGV[2]) == 0 then - return -1 -end local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("LPUSH", KEYS[2], id) @@ -720,14 +707,15 @@ redis.call("DEL", KEYS[1]) return table.getn(ids)`) func (r *RDB) runAll(zset, qname string) (int64, error) { + if err := r.checkQueueExists(qname); err != nil { + return 0, err + } keys := []string{ zset, base.PendingKey(qname), - base.AllQueues, } argv := []interface{}{ base.TaskKeyPrefix(qname), - qname, } res, err := runAllCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -779,21 +767,15 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { // Input: // KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:archived -// KEYS[3] -> all queues key // -- // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) // ARGV[4] -> task key prefix (asynq:{}:t:) -// ARGV[5] -> queue name // // Output: // integer: Number of tasks archived -// Returns -1 if queue doesn't exist var archiveAllPendingCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[3], ARGV[5]) == 0 then - return -1 -end local ids = redis.call("LRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) @@ -809,10 +791,12 @@ return table.getn(ids)`) // If a queue with the given name doesn't exist, it returns QueueNotFoundError. func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { var op errors.Op = "rdb.ArchiveAllPendingTasks" + if err := r.checkQueueExists(qname); err != nil { + return 0, errors.E(op, errors.CanonicalCode(err), err) + } keys := []string{ base.PendingKey(qname), base.ArchivedKey(qname), - base.AllQueues, } now := time.Now() argv := []interface{}{ @@ -820,7 +804,6 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, base.TaskKeyPrefix(qname), - qname, } res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -830,9 +813,6 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { if !ok { return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from script %v", res)) } - if n == -1 { - return 0, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) - } return n, nil } @@ -841,14 +821,12 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { // Input: // KEYS[1] -> task key (asynq:{}:t:) // KEYS[2] -> archived key (asynq:{}:archived) -// KEYS[3] -> all queues key // -- // ARGV[1] -> id of the task to archive // ARGV[2] -> current timestamp // ARGV[3] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> max number of tasks in archived state (e.g., 100) // ARGV[5] -> queue key prefix (asynq:{}:) -// ARGV[6] -> queue name // // Output: // Numeric code indicating the status: @@ -856,12 +834,8 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { // Returns 0 if task is not found. // Returns -1 if task is already archived. // Returns -2 if task is in active state. -// Returns -3 if queue doesn't exist. // Returns error reply if unexpected error occurs. var archiveTaskCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[3], ARGV[6]) == 0 then - return -3 -end if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end @@ -897,10 +871,12 @@ return 1 // If a task is in active state it returns non-nil error with FailedPrecondition code. func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { var op errors.Op = "rdb.ArchiveTask" + if err := r.checkQueueExists(qname); err != nil { + return errors.E(op, errors.CanonicalCode(err), err) + } keys := []string{ base.TaskKey(qname, id.String()), base.ArchivedKey(qname), - base.AllQueues, } now := time.Now() argv := []interface{}{ @@ -909,7 +885,6 @@ func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, base.QueueKeyPrefix(qname), - qname, } res, err := archiveTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -941,21 +916,15 @@ func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { // Input: // KEYS[1] -> ZSET to move task from (e.g., asynq:{}:retry) // KEYS[2] -> asynq:{}:archived -// KEYS[3] -> all queues key // -- // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) // ARGV[4] -> task key prefix (asynq:{}:t:) -// ARGV[5] -> queue name // // Output: // integer: number of tasks archived -// Returns -1 if queue doesn't exist var archiveAllCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[3], ARGV[5]) == 0 then - return -1 -end local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) @@ -967,6 +936,9 @@ redis.call("DEL", KEYS[1]) return table.getn(ids)`) func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { + if err := r.checkQueueExists(qname); err != nil { + return 0, err + } keys := []string{ src, dst, @@ -996,27 +968,22 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { // Input: // KEYS[1] -> asynq:{}:t: -// KEYS[2] -> all queues key +// -- // ARGV[1] -> task ID // ARGV[2] -> queue key prefix -// ARGV[3] -> queue name // // Output: // Numeric code indicating the status: // Returns 1 if task is successfully deleted. // Returns 0 if task is not found. -// Returns -1 if queue doesn't exist. -// Returns -2 if task is in active state. +// Returns -1 if task is in active state. var deleteTaskCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[2], ARGV[3]) == 0 then - return -1 -end if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end local state = redis.call("HGET", KEYS[1], "state") if state == "active" then - return -2 + return -1 end if state == "pending" then if redis.call("LREM", ARGV[2] .. state, 0, ARGV[1]) == 0 then @@ -1038,14 +1005,15 @@ return redis.call("DEL", KEYS[1]) // If a task is in active state it returns non-nil error with Code FailedPrecondition. func (r *RDB) DeleteTask(qname string, id uuid.UUID) error { var op errors.Op = "rdb.DeleteTask" + if err := r.checkQueueExists(qname); err != nil { + return errors.E(op, errors.CanonicalCode(err), err) + } keys := []string{ base.TaskKey(qname, id.String()), - base.AllQueues, } argv := []interface{}{ id.String(), base.QueueKeyPrefix(qname), - qname, } res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -1061,8 +1029,6 @@ func (r *RDB) DeleteTask(qname string, id uuid.UUID) error { case 0: return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) case -1: - return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) - case -2: return errors.E(op, errors.FailedPrecondition, "cannot delete task in active state. use CancelTask instead.") default: return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", n)) @@ -1115,18 +1081,12 @@ func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { // // Input: // KEYS[1] -> zset holding the task ids. -// KEYS[2] -> all queues key // -- // ARGV[1] -> task key prefix -// ARGV[2] -> queue name // // Output: // integer: number of tasks deleted -// Returns -1 if queue doesn't exist var deleteAllCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[2], ARGV[2]) == 0 then - return -1 -end local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) @@ -1135,6 +1095,9 @@ redis.call("DEL", KEYS[1]) return table.getn(ids)`) func (r *RDB) deleteAll(key, qname string) (int64, error) { + if err := r.checkQueueExists(qname); err != nil { + return 0, err + } keys := []string{ key, base.AllQueues, @@ -1151,9 +1114,6 @@ func (r *RDB) deleteAll(key, qname string) (int64, error) { if !ok { return 0, fmt.Errorf("unexpected return value from Lua script: %v", res) } - if n == -1 { - return 0, &errors.QueueNotFoundError{Queue: qname} - } return n, nil } @@ -1161,18 +1121,12 @@ func (r *RDB) deleteAll(key, qname string) (int64, error) { // // Input: // KEYS[1] -> asynq:{}:pending -// KEYS[2] -> all queues key // -- // ARGV[1] -> task key prefix -// ARGV[2] -> queue name // // Output: // integer: number of tasks deleted -// Returns -1 if queue doesn't exist var deleteAllPendingCmd = redis.NewScript(` -if redis.call("SISMEMBER", KEYS[2], ARGV[2]) == 0 then - return -1 -end local ids = redis.call("LRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) @@ -1184,13 +1138,14 @@ return table.getn(ids)`) // and returns the number of tasks deleted. func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { var op errors.Op = "rdb.DeleteAllPendingTasks" + if err := r.checkQueueExists(qname); err != nil { + return 0, errors.E(op, errors.CanonicalCode(err), err) + } keys := []string{ base.PendingKey(qname), - base.AllQueues, } argv := []interface{}{ base.TaskKeyPrefix(qname), - qname, } res, err := deleteAllPendingCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -1200,9 +1155,6 @@ func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { if !ok { return 0, errors.E(op, errors.Internal, "command error: unexpected return value %v", res) } - if n == -1 { - return 0, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) - } return n, nil }