diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 4da6fd4..27bb5e1 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -127,9 +127,6 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti return msg, time.Unix(d, 0), nil } -// KEYS[1] -> asynq:in_progress -// KEYS[2] -> asynq:paused -// KEYS[3] -> asynq:deadlines // ARGV[1] -> current time in Unix time // ARGV[2:] -> List of queues to query in order // @@ -140,8 +137,11 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti var dequeueCmd = redis.NewScript(` for i = 2, table.getn(ARGV) do local qkey = ARGV[i] - if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then - local msg = redis.call("RPOPLPUSH", qkey, KEYS[1]) + local key_paused = qkey .. ":paused" + local key_inprogress = qkey .. ":in_progress" + local key_deadlines = qkey .. ":deadlines" + if redis.call("EXISTS", key_paused) == 0 then + local msg = redis.call("RPOPLPUSH", qkey, key_inprogress) if msg then local decoded = cjson.decode(msg) local timeout = decoded["Timeout"] @@ -156,7 +156,7 @@ for i = 2, table.getn(ARGV) do else return redis.error_reply("asynq internal error: both timeout and deadline are not set") end - redis.call("ZADD", KEYS[3], score, msg) + redis.call("ZADD", key_deadlines, score, msg) return {msg, score} end end @@ -167,8 +167,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err var args []interface{} args = append(args, time.Now().Unix()) args = append(args, qkeys...) - res, err := dequeueCmd.Run(r.client, - []string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result() + res, err := dequeueCmd.Run(r.client, nil, args...).Result() if err != nil { return "", 0, err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 2e5d75d..fd1e6c2 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -146,8 +146,8 @@ func TestDequeue(t *testing.T) { wantDeadline time.Time err error wantEnqueued map[string][]*base.TaskMessage - wantInProgress []*base.TaskMessage - wantDeadlines []base.Z + wantInProgress map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z }{ { enqueued: map[string][]*base.TaskMessage{ @@ -160,12 +160,11 @@ func TestDequeue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, - wantInProgress: []*base.TaskMessage{t1}, - wantDeadlines: []base.Z{ - { - Message: t1, - Score: t1Deadline, - }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: t1Deadline}}, }, }, { @@ -179,8 +178,12 @@ func TestDequeue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + }, }, { enqueued: map[string][]*base.TaskMessage{ @@ -197,12 +200,15 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {t3}, }, - wantInProgress: []*base.TaskMessage{t2}, - wantDeadlines: []base.Z{ - { - Message: t2, - Score: t2Deadline, - }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t2}, + "low": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + "critical": {{Message: t2, Score: t2Deadline}}, + "low": {}, }, }, { @@ -220,12 +226,15 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {t2, t1}, }, - wantInProgress: []*base.TaskMessage{t3}, - wantDeadlines: []base.Z{ - { - Message: t3, - Score: t3Deadline, - }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t3}, + "critical": {}, + "low": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t3, Score: t3Deadline}}, + "critical": {}, + "low": {}, }, }, { @@ -243,16 +252,22 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {}, }, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - for queue, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, queue) - } + h.SeedAllEnqueuedQueues(t, r.client, msgs, queue, tc.enqueued) gotMsg, gotDeadline, err := r.Dequeue(tc.args...) if err != tc.err { @@ -277,21 +292,25 @@ func TestDequeue(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) + for queue, want := range tc.wantInProgress { + gotInProgress := h.GetInProgressMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff) + } } - gotDeadlines := h.GetDeadlinesEntries(t, r.client) - if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff) + for queue, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) + } } } } func TestDequeueIgnoresPausedQueues(t *testing.T) { r := setup(t) - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) - t2 := h.NewTaskMessage("export_csv", nil) + t1 := h.NewTaskMessageWithQueue("send_email", map[string]interface{}{"subject": "hello!"}, "default") + t2 := h.NewTaskMessageWithQueue("export_csv", nil, "critical") tests := []struct { paused []string // list of paused queues @@ -300,7 +319,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { wantMsg *base.TaskMessage err error wantEnqueued map[string][]*base.TaskMessage - wantInProgress []*base.TaskMessage + wantInProgress map[string][]*base.TaskMessage }{ { paused: []string{"default"}, @@ -315,7 +334,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {}, }, - wantInProgress: []*base.TaskMessage{t2}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t2}, + }, }, { paused: []string{"default"}, @@ -328,7 +350,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantInProgress: []*base.TaskMessage{}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + }, }, { paused: []string{"critical", "default"}, @@ -343,7 +367,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {t2}, }, - wantInProgress: []*base.TaskMessage{}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, }, } @@ -354,9 +381,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t.Fatal(err) } } - for queue, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, queue) - } + h.SeedAllEnqueuedQueues(t, r.client, msgs, queue, tc.enqueued) got, _, err := r.Dequeue(tc.args...) if !cmp.Equal(got, tc.wantMsg) || err != tc.err { @@ -371,10 +396,11 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) + for queue, want := range tc.wantInProgress { + gotInProgress := h.GetInProgressMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff) + } } } }