2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Fix done lua script

If UniqueKey is an empty string, do not provide the key to Lua script
because that will cause CROSSSLOT error in redis cluster (since it
doesn't have any hash tag).
This commit is contained in:
Ken Hibino
2020-08-28 05:37:40 -07:00
parent 8daac4af0f
commit 7ee1e27822

View File

@@ -186,11 +186,8 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
// Note: LREM count ZERO means "remove all elements equal to val"
var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
@@ -202,7 +199,28 @@ local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end
if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then
return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end
if redis.call("GET", KEYS[4]) == ARGV[3] then
redis.call("DEL", KEYS[4])
end
return redis.status_reply("OK")
@@ -217,9 +235,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
}
now := time.Now()
expireAt := now.Add(statsTTL)
return doneCmd.Run(r.client,
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), msg.UniqueKey},
encoded, expireAt.Unix(), msg.ID.String()).Err()
keys := []string{
base.InProgressKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
}
args := []interface{}{encoded, expireAt.Unix()}
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
args = append(args, msg.ID.String())
return doneUniqueCmd.Run(r.client, keys, args...).Err()
}
return doneCmd.Run(r.client, keys, args...).Err()
}
// KEYS[1] -> asynq:{<qname>}:in_progress