From ee1afd12f5d9e9bfa5cc5ba4790abc022c26759f Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 28 Aug 2020 05:37:40 -0700 Subject: [PATCH] Fix done lua script If UniqueKey is an empty string, do not provide the key to Lua script because that will cause CROSSSLOT error in redis cluster (since it doesn't have any hash tag). --- internal/rdb/rdb.go | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b792c49..d7a03a5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -186,11 +186,8 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err // KEYS[1] -> asynq:{}:in_progress // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:processed: -// KEYS[4] -> unique key // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp -// ARGV[3] -> task ID -// Note: LREM count ZERO means "remove all elements equal to val" var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -202,7 +199,28 @@ local n = redis.call("INCR", KEYS[3]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[3], ARGV[2]) end -if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then +return redis.status_reply("OK") +`) + +// KEYS[1] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:processed: +// KEYS[4] -> unique key +// ARGV[1] -> base.TaskMessage value +// ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task ID +var doneUniqueCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[2]) +end +if redis.call("GET", KEYS[4]) == ARGV[3] then redis.call("DEL", KEYS[4]) end return redis.status_reply("OK") @@ -217,9 +235,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error { } now := time.Now() expireAt := now.Add(statsTTL) - return doneCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), msg.UniqueKey}, - encoded, expireAt.Unix(), msg.ID.String()).Err() + keys := []string{ + base.InProgressKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.ProcessedKey(msg.Queue, now), + } + args := []interface{}{encoded, expireAt.Unix()} + if len(msg.UniqueKey) > 0 { + keys = append(keys, msg.UniqueKey) + args = append(args, msg.ID.String()) + return doneUniqueCmd.Run(r.client, keys, args...).Err() + } + return doneCmd.Run(r.client, keys, args...).Err() } // KEYS[1] -> asynq:{}:in_progress