mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-16 11:32:26 +08:00
Update CheckAndEnqueue to enqueue tasks to specified queue
This commit is contained in:
@@ -46,8 +46,8 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
qname := base.QueueKey(msg.Queue)
|
||||
return r.client.LPush(qname, string(bytes)).Err()
|
||||
key := base.QueueKey(msg.Queue)
|
||||
return r.client.LPush(key, string(bytes)).Err()
|
||||
}
|
||||
|
||||
// Dequeue blocks until there is a task available to be processed,
|
||||
@@ -309,11 +309,13 @@ func (r *RDB) forward(from string) error {
|
||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
||||
for _, msg in ipairs(msgs) do
|
||||
redis.call("ZREM", KEYS[1], msg)
|
||||
redis.call("LPUSH", KEYS[2], msg)
|
||||
local decoded = cjson.decode(msg)
|
||||
local qkey = ARGV[2] .. decoded["Queue"]
|
||||
redis.call("LPUSH", qkey, msg)
|
||||
end
|
||||
return msgs
|
||||
`)
|
||||
now := float64(time.Now().Unix())
|
||||
return script.Run(r.client,
|
||||
[]string{from, base.DefaultQueue}, now).Err()
|
||||
[]string{from, base.DefaultQueue}, now, base.QueuePrefix).Err()
|
||||
}
|
||||
|
Reference in New Issue
Block a user