2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 08:12:19 +08:00

Update RDB.Requeue with task state

This commit is contained in:
Ken Hibino 2021-04-26 07:13:48 -07:00
parent 77ded502ab
commit 5d7f1b6a80

View File

@ -307,6 +307,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:pending // KEYS[3] -> asynq:{<qname>}:pending
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID // ARGV[1] -> task ID
// Note: Use RPUSH to push to the head of the queue. // Note: Use RPUSH to push to the head of the queue.
var requeueCmd = redis.NewScript(` var requeueCmd = redis.NewScript(`
@ -317,13 +318,18 @@ if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
redis.call("RPUSH", KEYS[3], ARGV[1]) redis.call("RPUSH", KEYS[3], ARGV[1])
redis.call("HSET", KEYS[4], "state", "pending")
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Requeue moves the task from active queue to the specified queue. // Requeue moves the task from active queue to the specified queue.
func (r *RDB) Requeue(msg *base.TaskMessage) error { func (r *RDB) Requeue(msg *base.TaskMessage) error {
return requeueCmd.Run(r.client, keys := []string{
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)}, base.ActiveKey(msg.Queue),
msg.ID.String()).Err() base.DeadlinesKey(msg.Queue),
base.PendingKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID.String()),
}
return requeueCmd.Run(r.client, keys, msg.ID.String()).Err()
} }
// KEYS[1] -> asynq:{<qname>}:t:<task_id> // KEYS[1] -> asynq:{<qname>}:t:<task_id>