2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Update CheckAndEnqueue method in RDB

This commit is contained in:
Ken Hibino
2020-08-09 06:26:14 -07:00
parent 3795c42c98
commit 6515d4522b
2 changed files with 116 additions and 68 deletions

View File

@@ -419,48 +419,56 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
}
// CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that
// are ready to be processed.
func (r *RDB) CheckAndEnqueue() (err error) {
delayed := []string{base.ScheduledQueue, base.RetryQueue}
for _, zset := range delayed {
n := 1
for n != 0 {
n, err = r.forward(zset)
if err != nil {
return err
}
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues
//and enqueues any tasks that are ready to be processed.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil {
return err
}
if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil {
return err
}
}
return nil
}
// KEYS[1] -> source queue (e.g. scheduled or retry queue)
// KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})
// KEYS[2] -> destination queue (e.g. asynq:{<qname>})
// ARGV[1] -> current unix time
// ARGV[2] -> queue prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
end
return table.getn(msgs)`)
// forward moves tasks with a score less than the current unix time
// from the src zset. It returns the number of tasks moved.
func (r *RDB) forward(src string) (int, error) {
// from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src, dst string) (int, error) {
now := float64(time.Now().Unix())
res, err := forwardCmd.Run(r.client,
[]string{src}, now, base.QueuePrefix).Result()
res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result()
if err != nil {
return 0, err
}
return cast.ToInt(res), nil
}
// forwardAll moves tasks with a score less than the current unix time from the src zset,
// until there's no more tasks.
func (r *RDB) forwardAll(src, dst string) error {
n := 1
for n != 0 {
n, err = r.forward(src, dst)
if err != nil {
return err
}
}
return nil
}
// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.
func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage