diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 148637d..89aac43 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -108,10 +108,16 @@ func FlushDB(tb testing.TB, r *redis.Client) { } } -// SeedDefaultQueue initializes the default queue with the given messages. -func SeedDefaultQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { +// SeedEnqueuedQueue initializes the specified queue with the given messages. +// +// If queue name option is not passed, it defaults to the default queue. +func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, queueOpt ...string) { tb.Helper() - seedRedisList(tb, r, base.DefaultQueue, msgs) + queue := base.DefaultQueue + if len(queueOpt) > 0 { + queue = base.QueueKey(queueOpt[0]) + } + seedRedisList(tb, r, queue, msgs) } // SeedInProgressQueue initializes the in-progress queue with the given messages. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 949d23c..71589a6 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -119,7 +119,7 @@ func TestCurrentStats(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDefaultQueue(t, r.client, tc.enqueued) + h.SeedEnqueuedQueue(t, r.client, tc.enqueued) h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedRetryQueue(t, r.client, tc.retry) @@ -262,7 +262,7 @@ func TestListEnqueued(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDefaultQueue(t, r.client, tc.enqueued) + h.SeedEnqueuedQueue(t, r.client, tc.enqueued) got, err := r.ListEnqueued() if err != nil { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 35df06d..fcf0103 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -13,11 +13,12 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" + "github.com/spf13/cast" ) var ( - // ErrDequeueTimeout indicates that the blocking dequeue operation timed out. - ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") + // ErrNoProcessableTask indicates that there are no tasks ready to be processed. + ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") @@ -50,14 +51,17 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return r.client.LPush(key, string(bytes)).Err() } -// Dequeue blocks until there is a task available to be processed, -// once a task is available, it adds the task to "in progress" queue -// and returns the task. If there are no tasks for the entire timeout -// duration, it returns ErrDequeueTimeout. -func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { - data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result() +// Dequeue queries given queues in order and pops a task message if there +// is one and returns it. If all queues are empty, ErrNoProcessableTask +// error is returned. +func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { + var keys []string + for _, q := range qnames { + keys = append(keys, base.QueueKey(q)) + } + data, err := r.dequeue(keys...) if err == redis.Nil { - return nil, ErrDequeueTimeout + return nil, ErrNoProcessableTask } if err != nil { return nil, err @@ -70,6 +74,28 @@ func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { return &msg, nil } +func (r *RDB) dequeue(queues ...string) (data string, err error) { + var args []interface{} + for _, qkey := range queues { + args = append(args, qkey) + } + script := redis.NewScript(` + local res + for _, qkey in ipairs(ARGV) do + res = redis.call("RPOPLPUSH", qkey, KEYS[1]) + if res then + return res + end + end + return res + `) + res, err := script.Run(r.client, []string{base.InProgressQueue}, args...).Result() + if err != nil { + return "", err + } + return cast.ToStringE(res) +} + // Done removes the task from in-progress queue to mark the task as done. func (r *RDB) Done(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 57a5680..bb29aef 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -59,37 +59,111 @@ func TestEnqueue(t *testing.T) { func TestDequeue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) + t2 := h.NewTaskMessage("export_csv", nil) + t3 := h.NewTaskMessage("reindex", nil) + tests := []struct { - enqueued []*base.TaskMessage + enqueued map[string][]*base.TaskMessage + args []string // list of queues to query want *base.TaskMessage err error + wantEnqueued map[string][]*base.TaskMessage wantInProgress []*base.TaskMessage }{ { - enqueued: []*base.TaskMessage{t1}, - want: t1, - err: nil, + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + args: []string{"default"}, + want: t1, + err: nil, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, wantInProgress: []*base.TaskMessage{t1}, }, { - enqueued: []*base.TaskMessage{}, - want: nil, - err: ErrDequeueTimeout, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + args: []string{"default"}, + want: nil, + err: ErrNoProcessableTask, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantInProgress: []*base.TaskMessage{}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {t2}, + "low": {t3}, + }, + args: []string{"critical", "default", "low"}, + want: t2, + err: nil, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {}, + "low": {t3}, + }, + wantInProgress: []*base.TaskMessage{t2}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {}, + "low": {t2, t3}, + }, + args: []string{"critical", "default", "low"}, + want: t1, + err: nil, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {t2, t3}, + }, + wantInProgress: []*base.TaskMessage{t1}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + args: []string{"critical", "default", "low"}, + want: nil, + err: ErrNoProcessableTask, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, wantInProgress: []*base.TaskMessage{}, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDefaultQueue(t, r.client, tc.enqueued) + for queue, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, queue) + } - got, err := r.Dequeue(time.Second) + got, err := r.Dequeue(tc.args...) if !cmp.Equal(got, tc.want) || err != tc.err { - t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", - got, err, tc.want, tc.err) + t.Errorf("(*RDB).Dequeue(%v) = %v, %v; want %v, %v", + tc.args, got, err, tc.want, tc.err) continue } + for queue, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) + } + } + gotInProgress := h.GetInProgressMessages(t, r.client) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) @@ -178,7 +252,7 @@ func TestRequeue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDefaultQueue(t, r.client, tc.enqueued) + h.SeedEnqueuedQueue(t, r.client, tc.enqueued) h.SeedInProgressQueue(t, r.client, tc.inProgress) err := r.Requeue(tc.target) @@ -468,7 +542,7 @@ func TestRestoreUnfinished(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedDefaultQueue(t, r.client, tc.enqueued) + h.SeedEnqueuedQueue(t, r.client, tc.enqueued) got, err := r.RestoreUnfinished() if got != tc.want || err != nil { diff --git a/processor.go b/processor.go index 06930db..55e93f5 100644 --- a/processor.go +++ b/processor.go @@ -109,9 +109,17 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - msg, err := p.rdb.Dequeue(p.dequeueTimeout) - if err == rdb.ErrDequeueTimeout { - // timed out, this is a normal behavior. + // TODO(hibiken): Randomize the order to avoid starving low priority queues + var qnames []string + for q := range p.queueConfig { + qnames = append(qnames, q) + } + + msg, err := p.rdb.Dequeue(qnames...) + if err == rdb.ErrNoProcessableTask { + // queues are empty, this is a normal behavior. + // sleep to avoid slamming redis and let scheduler move tasks into queues. + time.Sleep(time.Second) return } if err != nil { diff --git a/processor_test.go b/processor_test.go index 915bb72..943ebd7 100644 --- a/processor_test.go +++ b/processor_test.go @@ -52,7 +52,7 @@ func TestProcessorSuccess(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. + h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. // instantiate a new processor var mu sync.Mutex @@ -138,7 +138,7 @@ func TestProcessorRetry(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. + h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. // instantiate a new processor delayFunc := func(n int, e error, t *Task) time.Duration { diff --git a/scheduler_test.go b/scheduler_test.go index 387466d..6eadf0d 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -67,7 +67,7 @@ func TestScheduler(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue - h.SeedDefaultQueue(t, r, tc.initQueue) // initialize default queue + h.SeedEnqueuedQueue(t, r, tc.initQueue) // initialize default queue s.start() time.Sleep(tc.wait)