diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 423f1ff..bb4b92e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -226,9 +226,9 @@ func (r *RDB) Done(msg *base.TaskMessage) error { encoded, expireAt.Unix(), msg.ID.String()).Err() } -// KEYS[1] -> asynq:in_progress -// KEYS[2] -> asynq:deadlines -// KEYS[3] -> asynq:queues: +// KEYS[1] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{} // ARGV[1] -> base.TaskMessage value // Note: Use RPUSH to push to the head of the queue. var requeueCmd = redis.NewScript(` @@ -248,7 +248,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { return err } return requeueCmd.Run(r.client, - []string{base.InProgressQueue, base.KeyDeadlines, base.QueueKey(msg.Queue)}, + []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)}, encoded).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 709adca..f2aed5e 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -583,75 +583,96 @@ func TestRequeue(t *testing.T) { tests := []struct { enqueued map[string][]*base.TaskMessage // initial state of queues - inProgress []*base.TaskMessage // initial state of the in-progress list - deadlines []base.Z // initial state of the deadlines set + inProgress map[string][]*base.TaskMessage // initial state of the in-progress list + deadlines map[string][]base.Z // initial state of the deadlines set target *base.TaskMessage // task to requeue wantEnqueued map[string][]*base.TaskMessage // final state of queues - wantInProgress []*base.TaskMessage // final state of the in-progress list - wantDeadlines []base.Z // final state of the deadlines set + wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list + wantDeadlines map[string][]base.Z // final state of the deadlines set }{ { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {}, + "default": {}, + }, + inProgress: map[string][]*base.TaskMessage{ + "default": {t1, t2}, }, - inProgress: []*base.TaskMessage{t1, t2}, deadlines: []base.Z{ - {Message: t1, Score: t1Deadline}, - {Message: t2, Score: t2Deadline}, + "default": { + {Message: t1, Score: t1Deadline}, + {Message: t2, Score: t2Deadline}, + }, }, target: t1, wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, + "default": {t1}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2}, }, - wantInProgress: []*base.TaskMessage{t2}, wantDeadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, + "defult": { + {Message: t2, Score: t2Deadline}, + }, }, }, { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, + "default": {t1}, }, - inProgress: []*base.TaskMessage{t2}, - deadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, + inProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + }, + deadlines: map[string][]base.Z{ + "default": { + {Message: t2, Score: t2Deadline}, + }, }, target: t2, wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1, t2}, + "default": {t1, t2}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, }, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, }, { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, - "critical": {}, + "default": {t1}, + "critical": {}, }, - inProgress: []*base.TaskMessage{t2, t3}, - deadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, - {Message: t3, Score: t3Deadline}, + inProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + "critical": {t3}, + }, + deadlines: map[string][]base.Z{ + "defualt": {{Message: t2, Score: t2Deadline}}, + "critial": {{Message: t3, Score: t3Deadline}}, }, target: t3, wantEnqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {t1}, - "critical": {t3}, + "default": {t1}, + "critical": {t3}, }, - wantInProgress: []*base.TaskMessage{t2}, - wantDeadlines: []base.Z{ - {Message: t2, Score: t2Deadline}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + "critical": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t2, Score: t2Deadline}}, + "critical": {}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, qname) - } - h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedDeadlines(t, r.client, tc.deadlines) + h.SeedAllEnqueuedQueues(t, r.client, msgs, tc.enqueued) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllDeadlines(t, r.client, tc.deadlines) err := r.Requeue(tc.target) if err != nil { @@ -665,14 +686,17 @@ func TestRequeue(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) + for qname, want := range tc.wantInProgress { + gotInProgress := h.GetInProgressMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressKey(qname), diff) + } } - gotDeadlines := h.GetDeadlinesEntries(t, r.client) - if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.KeyDeadlines, diff) + for qname, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r.client, qname) + if diff := cmp.Diff(wnt, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DeadlinesKey(qname), diff) + } } } }