From 166497748be98c53c8926321d2825f4c5eea22e1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 26 Jan 2020 13:41:06 -0800 Subject: [PATCH] (fix): Requeue to select correct queue --- internal/rdb/rdb.go | 2 +- internal/rdb/rdb_test.go | 60 ++++++++++++++++++++++++++++------------ 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 80c16eb..31e3bda 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -156,7 +156,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { return redis.status_reply("OK") `) return script.Run(r.client, - []string{base.InProgressQueue, base.DefaultQueue}, + []string{base.InProgressQueue, base.QueueKey(msg.Queue)}, string(bytes)).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 332f86c..19c49b6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -236,33 +236,57 @@ func TestRequeue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("export_csv", nil) + t3 := h.NewTaskMessageWithQueue("send_email", nil, "critical") tests := []struct { - enqueued []*base.TaskMessage // initial state of the default queue - inProgress []*base.TaskMessage // initial state of the in-progress list - target *base.TaskMessage // task to requeue - wantEnqueued []*base.TaskMessage // final state of the default queue - wantInProgress []*base.TaskMessage // final state of the in-progress list + enqueued map[string][]*base.TaskMessage // initial state of queues + inProgress []*base.TaskMessage // initial state of the in-progress list + target *base.TaskMessage // task to requeue + wantEnqueued map[string][]*base.TaskMessage // final state of queues + wantInProgress []*base.TaskMessage // final state of the in-progress list }{ { - enqueued: []*base.TaskMessage{}, - inProgress: []*base.TaskMessage{t1, t2}, - target: t1, - wantEnqueued: []*base.TaskMessage{t1}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {}, + }, + inProgress: []*base.TaskMessage{t1, t2}, + target: t1, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + }, wantInProgress: []*base.TaskMessage{t2}, }, { - enqueued: []*base.TaskMessage{t1}, - inProgress: []*base.TaskMessage{t2}, - target: t2, - wantEnqueued: []*base.TaskMessage{t1, t2}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + }, + inProgress: []*base.TaskMessage{t2}, + target: t2, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2}, + }, wantInProgress: []*base.TaskMessage{}, }, + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + "critical": {}, + }, + inProgress: []*base.TaskMessage{t2, t3}, + target: t3, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + "critical": {t3}, + }, + wantInProgress: []*base.TaskMessage{t2}, + }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedEnqueuedQueue(t, r.client, tc.enqueued) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } h.SeedInProgressQueue(t, r.client, tc.inProgress) err := r.Requeue(tc.target) @@ -271,9 +295,11 @@ func TestRequeue(t *testing.T) { continue } - gotEnqueued := h.GetEnqueuedMessages(t, r.client) - if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff) + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + } } gotInProgress := h.GetInProgressMessages(t, r.client)