diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ec9593d..8983fd4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -402,19 +402,16 @@ func (r *RDB) RequeueAll() (int64, error) { // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // have to be processed. -// -// qnames specifies to which queues to send tasks. -func (r *RDB) CheckAndEnqueue(qnames ...string) error { +// TODO: remove qnames param +func (r *RDB) CheckAndEnqueue(qnames ...string) (err error) { delayed := []string{base.ScheduledQueue, base.RetryQueue} for _, zset := range delayed { - var err error - if len(qnames) == 1 { - err = r.forwardSingle(zset, base.QueueKey(qnames[0])) - } else { - err = r.forward(zset) - } - if err != nil { - return err + n := 1 + for n > 0 { + n, err = r.forward(zset) + if err != nil { + return err + } } } return nil @@ -424,39 +421,25 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error { // ARGV[1] -> current unix time // ARGV[2] -> queue prefix var forwardCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) for _, msg in ipairs(msgs) do local decoded = cjson.decode(msg) local qkey = ARGV[2] .. decoded["Queue"] redis.call("LPUSH", qkey, msg) redis.call("ZREM", KEYS[1], msg) end -return msgs`) +return table.getn(msgs)`) // forward moves all tasks with a score less than the current unix time // from the src zset. -func (r *RDB) forward(src string) error { +func (r *RDB) forward(src string) (int, error) { now := float64(time.Now().Unix()) - return forwardCmd.Run(r.client, - []string{src}, now, base.QueuePrefix).Err() -} - -// KEYS[1] -> source queue (e.g. scheduled or retry queue) -// KEYS[2] -> destination queue -var forwardSingleCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) -for _, msg in ipairs(msgs) do - redis.call("LPUSH", KEYS[2], msg) - redis.call("ZREM", KEYS[1], msg) -end -return msgs`) - -// forwardSingle moves all tasks with a score less than the current unix time -// from the src zset to dst list. -func (r *RDB) forwardSingle(src, dst string) error { - now := float64(time.Now().Unix()) - return forwardSingleCmd.Run(r.client, - []string{src, dst}, now).Err() + res, err := forwardCmd.Run(r.client, + []string{src}, now, base.QueuePrefix).Result() + if err != nil { + return 0, err + } + return cast.ToInt(res), err } // KEYS[1] -> asynq:servers: