diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index bb4b92e..6132ec4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -419,48 +419,56 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } -// CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that -// are ready to be processed. -func (r *RDB) CheckAndEnqueue() (err error) { - delayed := []string{base.ScheduledQueue, base.RetryQueue} - for _, zset := range delayed { - n := 1 - for n != 0 { - n, err = r.forward(zset) - if err != nil { - return err - } +// CheckAndEnqueue checks for scheduled/retry tasks for the given queues +//and enqueues any tasks that are ready to be processed. +func (r *RDB) CheckAndEnqueue(qnames ...string) error { + for _, qname := range qnames { + if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil { + return err + } + if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil { + return err } } return nil } -// KEYS[1] -> source queue (e.g. scheduled or retry queue) +// KEYS[1] -> source queue (e.g. asynq:{:scheduled or asynq:{}:retry}) +// KEYS[2] -> destination queue (e.g. asynq:{}) // ARGV[1] -> current unix time -// ARGV[2] -> queue prefix // Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - local qkey = ARGV[2] .. decoded["Queue"] - redis.call("LPUSH", qkey, msg) + redis.call("LPUSH", KEYS[2], msg) redis.call("ZREM", KEYS[1], msg) end return table.getn(msgs)`) // forward moves tasks with a score less than the current unix time -// from the src zset. It returns the number of tasks moved. -func (r *RDB) forward(src string) (int, error) { +// from the src zset to the dst list. It returns the number of tasks moved. +func (r *RDB) forward(src, dst string) (int, error) { now := float64(time.Now().Unix()) - res, err := forwardCmd.Run(r.client, - []string{src}, now, base.QueuePrefix).Result() + res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result() if err != nil { return 0, err } return cast.ToInt(res), nil } +// forwardAll moves tasks with a score less than the current unix time from the src zset, +// until there's no more tasks. +func (r *RDB) forwardAll(src, dst string) error { + n := 1 + for n != 0 { + n, err = r.forward(src, dst) + if err != nil { + return err + } + } + return nil +} + // ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { var msgs []*base.TaskMessage diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f2aed5e..61595b3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1161,82 +1161,120 @@ func TestCheckAndEnqueue(t *testing.T) { t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("generate_csv", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil) - t4 := h.NewTaskMessage("important_task", nil) - t4.Queue = "critical" - t5 := h.NewTaskMessage("minor_task", nil) - t5.Queue = "low" + t4 := h.NewTaskMessageWithQueue("important_task", nil, "critical") + t5 := h.NewTaskMessageWithQueue("minor_task", nil, "low") secondAgo := time.Now().Add(-time.Second) hourFromNow := time.Now().Add(time.Hour) tests := []struct { - scheduled []base.Z - retry []base.Z + scheduled map[string][]base.Z + retry map[string][]base.Z + qnames []string wantEnqueued map[string][]*base.TaskMessage - wantScheduled []*base.TaskMessage - wantRetry []*base.TaskMessage + wantScheduled map[string][]*base.TaskMessage + wantRetry map[string][]*base.TaskMessage }{ { - scheduled: []base.Z{ - {Message: t1, Score: secondAgo.Unix()}, - {Message: t2, Score: secondAgo.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: t1, Score: secondAgo.Unix()}, + {Message: t2, Score: secondAgo.Unix()}, + }, }, - retry: []base.Z{ - {Message: t3, Score: secondAgo.Unix()}}, + retry: map[string][]base.Z{ + "default": {{Message: t3, Score: secondAgo.Unix()}}, + }, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, - wantScheduled: []*base.TaskMessage{}, - wantRetry: []*base.TaskMessage{}, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + }, }, { - scheduled: []base.Z{ - {Message: t1, Score: hourFromNow.Unix()}, - {Message: t2, Score: secondAgo.Unix()}}, - retry: []base.Z{ - {Message: t3, Score: secondAgo.Unix()}}, + scheduled: map[string][]base.Z{ + "default": { + {Message: t1, Score: hourFromNow.Unix()}, + {Message: t2, Score: secondAgo.Unix()}, + }, + }, + retry: map[string][]base.Z{ + "default": {{Message: t3, Score: secondAgo.Unix()}}, + }, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, - wantScheduled: []*base.TaskMessage{t1}, - wantRetry: []*base.TaskMessage{}, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + }, }, { - scheduled: []base.Z{ - {Message: t1, Score: hourFromNow.Unix()}, - {Message: t2, Score: hourFromNow.Unix()}}, - retry: []base.Z{ - {Message: t3, Score: hourFromNow.Unix()}}, + scheduled: map[string][]base.Z{ + "default": { + {Message: t1, Score: hourFromNow.Unix()}, + {Message: t2, Score: hourFromNow.Unix()}, + }, + }, + retry: map[string][]base.Z{ + "default": {{Message: t3, Score: hourFromNow.Unix()}}, + }, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, - wantScheduled: []*base.TaskMessage{t1, t2}, - wantRetry: []*base.TaskMessage{t3}, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {t1, t2}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {t3}, + }, }, { - scheduled: []base.Z{ - {Message: t1, Score: secondAgo.Unix()}, - {Message: t4, Score: secondAgo.Unix()}, + scheduled: map[string][]base.Z{ + "default": {{Message: t1, Score: secondAgo.Unix()}}, + "critical": {{Message: t4, Score: secondAgo.Unix()}}, + "low": {}, }, - retry: []base.Z{ - {Message: t5, Score: secondAgo.Unix()}}, + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {{Message: t5, Score: secondAgo.Unix()}}, + }, + qnames: []string{"default", "critical", "low"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t4}, "low": {t5}, }, - wantScheduled: []*base.TaskMessage{}, - wantRetry: []*base.TaskMessage{}, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedScheduledQueue(t, r.client, tc.scheduled) - h.SeedRetryQueue(t, r.client, tc.retry) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllRetryQueues(t, r.client, tc.retry) - err := r.CheckAndEnqueue() + err := r.CheckAndEnqueue(tc.qnames...) if err != nil { - t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) + t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err) continue } @@ -1246,15 +1284,17 @@ func TestCheckAndEnqueue(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } - - gotScheduled := h.GetScheduledMessages(t, r.client) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } } - - gotRetry := h.GetRetryMessages(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff) + } } } }