2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-09-16 11:32:26 +08:00

Add methods to rdb.RDB to enqueues a task from scheduled, retry, dead

queues
This commit is contained in:
Ken Hibino
2019-12-08 06:46:04 -08:00
parent 8e2c4e5716
commit 680a2cf3df
4 changed files with 336 additions and 41 deletions

View File

@@ -14,7 +14,6 @@ import (
// Redis keys
const (
allQueues = "asynq:queues" // SET
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
defaultQ = queuePrefix + "default" // LIST
scheduledQ = "asynq:scheduled" // ZSET
@@ -23,8 +22,13 @@ const (
inProgressQ = "asynq:in_progress" // LIST
)
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
var (
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task")
)
// RDB is a client interface to query and mutate task queues.
type RDB struct {
@@ -72,7 +76,6 @@ func (r *RDB) Enqueue(msg *TaskMessage) error {
}
qname := queuePrefix + msg.Queue
pipe := r.client.Pipeline()
pipe.SAdd(allQueues, qname)
pipe.LPush(qname, string(bytes))
_, err = pipe.Exec()
if err != nil {
@@ -182,19 +185,18 @@ func (r *RDB) CheckAndEnqueue() error {
return nil
}
// Forward moves all tasks with a score less than the current unix time
// forward moves all tasks with a score less than the current unix time
// from the given zset to the default queue.
func (r *RDB) forward(from string) error {
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
redis.call("SADD", KEYS[2], KEYS[3])
redis.call("LPUSH", KEYS[3], msg)
redis.call("LPUSH", KEYS[2], msg)
end
return msgs
`)
now := float64(time.Now().Unix())
_, err := script.Run(r.client, []string{from, allQueues, defaultQ}, now).Result()
_, err := script.Run(r.client, []string{from, defaultQ}, now).Result()
return err
}