From 7ee1e278226e82f7e30c8d68829da7c6af44cd4e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 28 Aug 2020 05:37:40 -0700 Subject: [PATCH] Fix done lua script If UniqueKey is an empty string, do not provide the key to Lua script because that will cause CROSSSLOT error in redis cluster (since it doesn't have any hash tag). --- internal/rdb/rdb.go | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b792c49..d7a03a5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -186,11 +186,8 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err // KEYS[1] -> asynq:{}:in_progress // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:processed: -// KEYS[4] -> unique key // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp -// ARGV[3] -> task ID -// Note: LREM count ZERO means "remove all elements equal to val" var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -202,7 +199,28 @@ local n = redis.call("INCR", KEYS[3]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[3], ARGV[2]) end -if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then +return redis.status_reply("OK") +`) + +// KEYS[1] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:processed: +// KEYS[4] -> unique key +// ARGV[1] -> base.TaskMessage value +// ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task ID +var doneUniqueCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[2]) +end +if redis.call("GET", KEYS[4]) == ARGV[3] then redis.call("DEL", KEYS[4]) end return redis.status_reply("OK") @@ -217,9 +235,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error { } now := time.Now() expireAt := now.Add(statsTTL) - return doneCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), msg.UniqueKey}, - encoded, expireAt.Unix(), msg.ID.String()).Err() + keys := []string{ + base.InProgressKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.ProcessedKey(msg.Queue, now), + } + args := []interface{}{encoded, expireAt.Unix()} + if len(msg.UniqueKey) > 0 { + keys = append(keys, msg.UniqueKey) + args = append(args, msg.ID.String()) + return doneUniqueCmd.Run(r.client, keys, args...).Err() + } + return doneCmd.Run(r.client, keys, args...).Err() } // KEYS[1] -> asynq:{}:in_progress