Update RDB.ForwardIfReady with task state

This commit is contained in:
Ken Hibino
2021-04-26 06:58:33 -07:00
parent df4e5c8d45
commit a647cd7225
2 changed files with 71 additions and 38 deletions

View File

@@ -556,10 +556,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
// and move any tasks that are ready to be processed to the pending set.
func (r *RDB) ForwardIfReady(qnames ...string) error {
for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil {
return err
}
if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil {
if err := r.forwardAll(qname); err != nil {
return err
}
}
@@ -567,36 +564,43 @@ func (r *RDB) ForwardIfReady(qnames ...string) error {
}
// KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})
// KEYS[2] -> destination queue (e.g. asynq:{<qname>})
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> current unix time
// ARGV[2] -> task key prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
redis.call("HSET", ARGV[2] .. id, "state", "pending")
end
return table.getn(ids)`)
// forward moves tasks with a score less than the current unix time
// from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src, dst string) (int, error) {
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
now := float64(time.Now().Unix())
res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result()
res, err := forwardCmd.Run(r.client, []string{src, dst}, now, taskKeyPrefix).Result()
if err != nil {
return 0, err
}
return cast.ToInt(res), nil
}
// forwardAll moves tasks with a score less than the current unix time from the src zset,
// until there's no more tasks.
func (r *RDB) forwardAll(src, dst string) (err error) {
n := 1
for n != 0 {
n, err = r.forward(src, dst)
if err != nil {
return err
// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates
// their state to "pending".
func (r *RDB) forwardAll(qname string) (err error) {
sources := []string{base.ScheduledKey(qname), base.RetryKey(qname)}
dst := base.PendingKey(qname)
taskKeyPrefix := base.TaskKeyPrefix(qname)
for _, src := range sources {
n := 1
for n != 0 {
n, err = r.forward(src, dst, taskKeyPrefix)
if err != nil {
return err
}
}
}
return nil