diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b792c49..d7a03a5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -186,11 +186,8 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err // KEYS[1] -> asynq:{}:in_progress // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:processed: -// KEYS[4] -> unique key // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp -// ARGV[3] -> task ID -// Note: LREM count ZERO means "remove all elements equal to val" var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -202,7 +199,28 @@ local n = redis.call("INCR", KEYS[3]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[3], ARGV[2]) end -if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then +return redis.status_reply("OK") +`) + +// KEYS[1] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:processed: +// KEYS[4] -> unique key +// ARGV[1] -> base.TaskMessage value +// ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task ID +var doneUniqueCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[2]) +end +if redis.call("GET", KEYS[4]) == ARGV[3] then redis.call("DEL", KEYS[4]) end return redis.status_reply("OK") @@ -217,9 +235,18 @@ func (r *RDB) Done(msg *base.TaskMessage) error { } now := time.Now() expireAt := now.Add(statsTTL) - return doneCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), msg.UniqueKey}, - encoded, expireAt.Unix(), msg.ID.String()).Err() + keys := []string{ + base.InProgressKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.ProcessedKey(msg.Queue, now), + } + args := []interface{}{encoded, expireAt.Unix()} + if len(msg.UniqueKey) > 0 { + keys = append(keys, msg.UniqueKey) + args = append(args, msg.ID.String()) + return doneUniqueCmd.Run(r.client, keys, args...).Err() + } + return doneCmd.Run(r.client, keys, args...).Err() } // KEYS[1] -> asynq:{}:in_progress