2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Fix done lua script

If UniqueKey is an empty string, do not provide the key to Lua script
because that will cause CROSSSLOT error in redis cluster (since it
doesn't have any hash tag).
This commit is contained in:
Ken Hibino 2020-08-28 05:37:40 -07:00
parent 3ac548e97c
commit ee1afd12f5

View File

@ -186,11 +186,8 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
// Note: LREM count ZERO means "remove all elements equal to val"
var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
@ -202,7 +199,28 @@ local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end
if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then
return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end
if redis.call("GET", KEYS[4]) == ARGV[3] then
redis.call("DEL", KEYS[4])
end
return redis.status_reply("OK")
@ -217,9 +235,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
}
now := time.Now()
expireAt := now.Add(statsTTL)
return doneCmd.Run(r.client,
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), msg.UniqueKey},
encoded, expireAt.Unix(), msg.ID.String()).Err()
keys := []string{
base.InProgressKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
}
args := []interface{}{encoded, expireAt.Unix()}
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
args = append(args, msg.ID.String())
return doneUniqueCmd.Run(r.client, keys, args...).Err()
}
return doneCmd.Run(r.client, keys, args...).Err()
}
// KEYS[1] -> asynq:{<qname>}:in_progress