2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 00:30:17 +08:00

WIP: Fix CheckAndEnqueue script to forward messages in chunks

This commit is contained in:
Ken Hibino 2020-06-07 05:45:47 -07:00
parent 6b8a4eed54
commit 8c6a21ff59

View File

@ -402,21 +402,18 @@ func (r *RDB) RequeueAll() (int64, error) {
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
// have to be processed. // have to be processed.
// // TODO: remove qnames param
// qnames specifies to which queues to send tasks. func (r *RDB) CheckAndEnqueue(qnames ...string) (err error) {
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
delayed := []string{base.ScheduledQueue, base.RetryQueue} delayed := []string{base.ScheduledQueue, base.RetryQueue}
for _, zset := range delayed { for _, zset := range delayed {
var err error n := 1
if len(qnames) == 1 { for n > 0 {
err = r.forwardSingle(zset, base.QueueKey(qnames[0])) n, err = r.forward(zset)
} else {
err = r.forward(zset)
}
if err != nil { if err != nil {
return err return err
} }
} }
}
return nil return nil
} }
@ -424,39 +421,25 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error {
// ARGV[1] -> current unix time // ARGV[1] -> current unix time
// ARGV[2] -> queue prefix // ARGV[2] -> queue prefix
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg) local decoded = cjson.decode(msg)
local qkey = ARGV[2] .. decoded["Queue"] local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg) redis.call("LPUSH", qkey, msg)
redis.call("ZREM", KEYS[1], msg) redis.call("ZREM", KEYS[1], msg)
end end
return msgs`) return table.getn(msgs)`)
// forward moves all tasks with a score less than the current unix time // forward moves all tasks with a score less than the current unix time
// from the src zset. // from the src zset.
func (r *RDB) forward(src string) error { func (r *RDB) forward(src string) (int, error) {
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
return forwardCmd.Run(r.client, res, err := forwardCmd.Run(r.client,
[]string{src}, now, base.QueuePrefix).Err() []string{src}, now, base.QueuePrefix).Result()
if err != nil {
return 0, err
} }
return cast.ToInt(res), err
// KEYS[1] -> source queue (e.g. scheduled or retry queue)
// KEYS[2] -> destination queue
var forwardSingleCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
end
return msgs`)
// forwardSingle moves all tasks with a score less than the current unix time
// from the src zset to dst list.
func (r *RDB) forwardSingle(src, dst string) error {
now := float64(time.Now().Unix())
return forwardSingleCmd.Run(r.client,
[]string{src, dst}, now).Err()
} }
// KEYS[1] -> asynq:servers:<host:pid:sid> // KEYS[1] -> asynq:servers:<host:pid:sid>