From 5d7f1b6a8000f63e5744446c9139dbfe179d837c Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 26 Apr 2021 07:13:48 -0700 Subject: [PATCH] Update RDB.Requeue with task state --- internal/rdb/rdb.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index bfa9e42..01edde6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -307,6 +307,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:pending +// KEYS[4] -> asynq:{}:t: // ARGV[1] -> task ID // Note: Use RPUSH to push to the head of the queue. var requeueCmd = redis.NewScript(` @@ -317,13 +318,18 @@ if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("RPUSH", KEYS[3], ARGV[1]) +redis.call("HSET", KEYS[4], "state", "pending") return redis.status_reply("OK")`) // Requeue moves the task from active queue to the specified queue. func (r *RDB) Requeue(msg *base.TaskMessage) error { - return requeueCmd.Run(r.client, - []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)}, - msg.ID.String()).Err() + keys := []string{ + base.ActiveKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.PendingKey(msg.Queue), + base.TaskKey(msg.Queue, msg.ID.String()), + } + return requeueCmd.Run(r.client, keys, msg.ID.String()).Err() } // KEYS[1] -> asynq:{}:t: