diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index aed147d..ec9593d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -103,19 +103,14 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { } // Dequeue queries given queues in order and pops a task message if there is one and returns it. +// Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { - var data string - var err error - if len(qnames) == 1 { - data, err = r.dequeueSingle(base.QueueKey(qnames[0])) - } else { - var keys []string - for _, q := range qnames { - keys = append(keys, base.QueueKey(q)) - } - data, err = r.dequeue(keys...) + var keys []string + for _, q := range qnames { + keys = append(keys, base.QueueKey(q)) } + data, err := r.dequeue(keys...) if err == redis.Nil { return nil, ErrNoProcessableTask } @@ -130,29 +125,30 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { return &msg, nil } -func (r *RDB) dequeueSingle(queue string) (data string, err error) { - // timeout needed to avoid blocking forever - return r.client.BRPopLPush(queue, base.InProgressQueue, time.Second).Result() -} - // KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:paused // ARGV -> List of queues to query in order +// +// dequeueCmd checks whether a queue is paused first, before +// calling RPOPLPUSH to pop a task from the queue. var dequeueCmd = redis.NewScript(` -local res for _, qkey in ipairs(ARGV) do - res = redis.call("RPOPLPUSH", qkey, KEYS[1]) - if res then - return res + if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then + local res = redis.call("RPOPLPUSH", qkey, KEYS[1]) + if res then + return res + end end end -return res`) +return nil`) func (r *RDB) dequeue(queues ...string) (data string, err error) { var args []interface{} for _, qkey := range queues { args = append(args, qkey) } - res, err := dequeueCmd.Run(r.client, []string{base.InProgressQueue}, args...).Result() + res, err := dequeueCmd.Run(r.client, + []string{base.InProgressQueue, base.PausedQueues}, args...).Result() if err != nil { return "", err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 2172214..27aebe3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -227,6 +227,97 @@ func TestDequeue(t *testing.T) { } } +func TestDequeueIgnoresPausedQueues(t *testing.T) { + r := setup(t) + t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) + t2 := h.NewTaskMessage("export_csv", nil) + + tests := []struct { + paused []string // list of paused queues + enqueued map[string][]*base.TaskMessage + args []string // list of queues to query + want *base.TaskMessage + err error + wantEnqueued map[string][]*base.TaskMessage + wantInProgress []*base.TaskMessage + }{ + { + paused: []string{"default"}, + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {t2}, + }, + args: []string{"default", "critical"}, + want: t2, + err: nil, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {}, + }, + wantInProgress: []*base.TaskMessage{t2}, + }, + { + paused: []string{"default"}, + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + args: []string{"default"}, + want: nil, + err: ErrNoProcessableTask, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + wantInProgress: []*base.TaskMessage{}, + }, + { + paused: []string{"critical", "default"}, + enqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {t2}, + }, + args: []string{"default", "critical"}, + want: nil, + err: ErrNoProcessableTask, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {t2}, + }, + wantInProgress: []*base.TaskMessage{}, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + for _, qname := range tc.paused { + if err := r.Pause(qname); err != nil { + t.Fatal(err) + } + } + for queue, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, queue) + } + + got, err := r.Dequeue(tc.args...) + if !cmp.Equal(got, tc.want) || err != tc.err { + t.Errorf("Dequeue(%v) = %v, %v; want %v, %v", + tc.args, got, err, tc.want, tc.err) + continue + } + + for queue, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) + } + } + + gotInProgress := h.GetInProgressMessages(t, r.client) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) + } + } +} + func TestDone(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) diff --git a/processor.go b/processor.go index 6b080dc..8b02551 100644 --- a/processor.go +++ b/processor.go @@ -166,14 +166,12 @@ func (p *processor) exec() { msg, err := p.broker.Dequeue(qnames...) switch { case err == rdb.ErrNoProcessableTask: - // queues are empty, this is a normal behavior. - if len(qnames) > 1 { - // sleep to avoid slamming redis and let scheduler move tasks into queues. - // Note: With multiple queues, we are not using blocking pop operation and - // polling queues instead. This adds significant load to redis. - time.Sleep(time.Second) - } p.logger.Debug("All queues are empty") + // Queues are empty, this is a normal behavior. + // Sleep to avoid slamming redis and let scheduler move tasks into queues. + // Note: We are not using blocking pop operation and polling queues instead. + // This adds significant load to redis. + time.Sleep(time.Second) return case err != nil: if p.errLogLimiter.Allow() {