diff --git a/client_test.go b/client_test.go index b02dbf0..366fbb8 100644 --- a/client_test.go +++ b/client_test.go @@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) { opts []Option wantRes *Result wantEnqueued map[string][]*base.TaskMessage - wantScheduled []base.Z + wantScheduled map[string][]base.Z }{ { desc: "Process task immediately", @@ -61,7 +61,9 @@ func TestClientEnqueueAt(t *testing.T) { }, }, }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + wantScheduled: map[string][]base.Z{ + "default": {}, + }, }, { desc: "Schedule task to be processed in the future", @@ -74,18 +76,22 @@ func TestClientEnqueueAt(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil - wantScheduled: []base.Z{ - { - Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantScheduled: map[string][]base.Z{ + "default": { + { + Message: &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + Score: oneHourLater.Unix(), }, - Score: oneHourLater.Unix(), }, }, }, @@ -110,10 +116,11 @@ func TestClientEnqueueAt(t *testing.T) { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) } } - - gotScheduled := h.GetScheduledEntries(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff) + } } } } @@ -376,7 +383,7 @@ func TestClientEnqueueIn(t *testing.T) { opts []Option wantRes *Result wantEnqueued map[string][]*base.TaskMessage - wantScheduled []base.Z + wantScheduled map[string][]base.Z }{ { desc: "schedule a task to be enqueued in one hour", @@ -389,18 +396,22 @@ func TestClientEnqueueIn(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil - wantScheduled: []base.Z{ - { - Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantScheduled: map[string][]base.Z{ + "default": { + { + Message: &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + Score: time.Now().Add(time.Hour).Unix(), }, - Score: time.Now().Add(time.Hour).Unix(), }, }, }, @@ -427,7 +438,9 @@ func TestClientEnqueueIn(t *testing.T) { }, }, }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + wantScheduled: map[string][]base.Z{ + "default": {}, + }, }, } @@ -450,10 +463,11 @@ func TestClientEnqueueIn(t *testing.T) { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) } } - - gotScheduled := h.GetScheduledEntries(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff) + } } } } @@ -587,7 +601,7 @@ func TestEnqueueUnique(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) continue @@ -634,7 +648,7 @@ func TestEnqueueInUnique(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) @@ -682,7 +696,7 @@ func TestEnqueueAtUnique(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) diff --git a/inspector_test.go b/inspector_test.go index bdb504c..ee9f79d 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -63,12 +63,12 @@ func TestInspectorCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, - retry: []base.Z{ + retry: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, }, - dead: []base.Z{ + dead: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, @@ -108,11 +108,11 @@ func TestInspectorCurrentStats(t *testing.T) { asynqtest.SeedAllDeadQueues(t, r, tc.dead) for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) - r.client.Set(processedKey, n, 0) + r.Set(processedKey, n, 0) } for qname, n := range tc.failed { failedKey := base.FailedKey(qname, now) - r.client.Set(failedKey, n, 0) + r.Set(failedKey, n, 0) } got, err := inspector.CurrentStats(tc.qname) @@ -337,7 +337,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -383,7 +383,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { asynqtest.FlushDB(t, r) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - got, err := inspector.ListScheduledTasks() + got, err := inspector.ListScheduledTasks(tc.qname) if err != nil { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue @@ -461,7 +461,7 @@ func TestInspectorListRetryTasks(t *testing.T) { asynqtest.FlushDB(t, r) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - got, err := inspector.ListRetryTasks() + got, err := inspector.ListRetryTasks(tc.qname) if err != nil { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue @@ -536,7 +536,7 @@ func TestInspectorListDeadTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.retry) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) got, err := inspector.ListDeadTasks(tc.qname) if err != nil { @@ -559,7 +559,7 @@ func TestInspectorListPagination(t *testing.T) { asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) } r := setup(t) - asynqtest.SeedEnqueuedQueue(t, r, msgs) + asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName) inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -841,26 +841,56 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { }, }, { - scheduled: []base.Z{z1, z2}, - dead: []base.Z{z3}, - want: 2, - wantDead: []base.Z{ - z3, - base.Z{Message: m1, Score: now.Unix()}, - base.Z{Message: m2, Score: now.Unix()}, + scheduled: map[string][]base.Z{ + "default": {z1, z2}, + }, + dead: map[string][]base.Z{ + "default": {z3}, + }, + qname: "default", + want: 2, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + z3, + base.Z{Message: m1, Score: now.Unix()}, + base.Z{Message: m2, Score: now.Unix()}, + }, }, }, { - scheduled: []base.Z(nil), - dead: []base.Z(nil), - want: 0, - wantDead: []base.Z(nil), + scheduled: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": {}, + }, }, { - scheduled: []base.Z(nil), - dead: []base.Z{z1, z2}, - want: 0, - wantDead: []base.Z{z1, z2}, + scheduled: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {z1, z2}, + }, + qname: "default", + want: 0, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": {z1, z2}, + }, }, } @@ -976,7 +1006,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantdead: map[string][]base.Z{ + wantDead: map[string][]base.Z{ "default": {z1, z2}, }, }, @@ -1506,7 +1536,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDAlleadQueues(t, r, tc.dead) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) @@ -1758,6 +1788,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { scheduled map[string][]base.Z dead map[string][]base.Z qname string + key string want string wantScheduled map[string][]base.Z wantDead map[string][]base.Z @@ -1773,7 +1804,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { }, qname: "custom", key: createScheduledTask(z2).Key(), - scheduled: map[string][]base.Z{ + wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, }, @@ -1849,7 +1880,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { "default": {z1}, "custom": {z3}, }, - wantDead: []base.Z{ + wantDead: map[string][]base.Z{ "default": {}, "custom": {{m2, now.Unix()}}, }, diff --git a/processor.go b/processor.go index 1614d41..fc27e82 100644 --- a/processor.go +++ b/processor.go @@ -241,7 +241,7 @@ func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { - errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err) + errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressKey(msg.Queue), err) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") @@ -274,7 +274,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { retryAt := time.Now().Add(d) err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { - errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.RetryKey(msg.Queue)) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") @@ -293,7 +293,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { err := p.broker.Kill(msg, e.Error()) if err != nil { - errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.DeadKey(msg.Queue)) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") diff --git a/processor_test.go b/processor_test.go index e045d9b..ee0433c 100644 --- a/processor_test.go +++ b/processor_test.go @@ -74,8 +74,8 @@ func TestProcessorSuccess(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. // instantiate a new processor var mu sync.Mutex @@ -118,8 +118,8 @@ func TestProcessorSuccess(t *testing.T) { } } time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed. - if l := r.LLen(base.InProgressQueue).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) + if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } p.terminate() @@ -150,8 +150,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. var mu sync.Mutex var processed []*Task @@ -191,8 +191,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p.start(&sync.WaitGroup{}) time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed. - if l := r.LLen(base.InProgressQueue).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) + if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } p.terminate() @@ -246,8 +246,8 @@ func TestProcessorRetry(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. // instantiate a new processor delayFunc := func(n int, e error, t *Task) time.Duration { @@ -295,18 +295,18 @@ func TestProcessorRetry(t *testing.T) { p.terminate() cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score - gotRetry := h.GetRetryEntries(t, r) + gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff) } - gotDead := h.GetDeadMessages(t, r) + gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName) if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadQueue, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff) } - if l := r.LLen(base.InProgressQueue).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) + if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } if n != tc.wantErrCount { @@ -455,8 +455,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } - if l := r.LLen(base.InProgressQueue).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) + if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } } } diff --git a/recoverer_test.go b/recoverer_test.go index c43f5f0..057dd61 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -139,7 +139,10 @@ func TestRecoverer(t *testing.T) { "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, "critical": {}, }, - wantDead: []*base.TaskMessage{}, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, }, { desc: "with multiple expired tasks in-progress", diff --git a/scheduler_test.go b/scheduler_test.go index bd56752..cf3391c 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -129,7 +129,7 @@ func TestScheduler(t *testing.T) { for qname, want := range tc.wantEnqueued { gotEnqueued := h.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultKey(qname), diff) + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff) } } } diff --git a/syncer_test.go b/syncer_test.go index 1be2d18..bed924e 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -23,7 +23,7 @@ func TestSyncer(t *testing.T) { } r := setup(t) rdbClient := rdb.NewRDB(r) - h.SeedInProgressQueue(t, r, inProgress) + h.SeedInProgressQueue(t, r, inProgress, base.DefaultQueueName) const interval = time.Second syncRequestCh := make(chan *syncRequest) @@ -48,9 +48,9 @@ func TestSyncer(t *testing.T) { time.Sleep(2 * interval) // ensure that syncer runs at least once - gotInProgress := h.GetInProgressMessages(t, r) + gotInProgress := h.GetInProgressMessages(t, r, base.DefaultQueueName) if l := len(gotInProgress); l != 0 { - t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) + t.Errorf("%q has length %d; want 0", base.InProgressKey(base.DefaultQueueName), l) } }