diff --git a/processor_test.go b/processor_test.go index cedb427..9aababd 100644 --- a/processor_test.go +++ b/processor_test.go @@ -44,6 +44,7 @@ func fakeSyncer(syncCh <-chan *syncRequest, done <-chan struct{}) { func TestProcessorSuccessWithSingleQueue(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) m1 := h.NewTaskMessage("task1", nil) @@ -146,6 +147,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { t3 = NewTask(m3.Type, m3.Payload) t4 = NewTask(m4.Type, m4.Payload) ) + defer r.Close() tests := []struct { pending map[string][]*base.TaskMessage @@ -226,6 +228,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { // https://github.com/hibiken/asynq/issues/166 func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) @@ -302,6 +305,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { func TestProcessorRetry(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) m1 := h.NewTaskMessage("send_email", nil) @@ -312,66 +316,55 @@ func TestProcessorRetry(t *testing.T) { errMsg := "something went wrong" wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry) - now := time.Now() tests := []struct { desc string // test description pending []*base.TaskMessage // initial default queue state - incoming []*base.TaskMessage // tasks to be enqueued during run delay time.Duration // retry delay duration handler Handler // task handler wait time.Duration // wait duration between starting and stopping processor for this test case - wantRetry []base.Z // tasks in retry queue at the end + wantErrMsg string // error message the task should record + wantRetry []*base.TaskMessage // tasks in retry queue at the end wantArchived []*base.TaskMessage // tasks in archived queue at the end wantErrCount int // number of times error handler should be called }{ { - desc: "Should automatically retry errored tasks", - pending: []*base.TaskMessage{m1, m2}, - incoming: []*base.TaskMessage{m3, m4}, - delay: time.Minute, + desc: "Should automatically retry errored tasks", + pending: []*base.TaskMessage{m1, m2, m3, m4}, + delay: time.Minute, handler: HandlerFunc(func(ctx context.Context, task *Task) error { return fmt.Errorf(errMsg) }), - wait: 2 * time.Second, - wantRetry: []base.Z{ - {Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()}, - {Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()}, - {Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()}, - }, - wantArchived: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, + wait: 2 * time.Second, + wantErrMsg: errMsg, + wantRetry: []*base.TaskMessage{m2, m3, m4}, + wantArchived: []*base.TaskMessage{m1}, wantErrCount: 4, }, { - desc: "Should skip retry errored tasks", - pending: []*base.TaskMessage{m1, m2}, - incoming: []*base.TaskMessage{}, - delay: time.Minute, + desc: "Should skip retry errored tasks", + pending: []*base.TaskMessage{m1, m2}, + delay: time.Minute, handler: HandlerFunc(func(ctx context.Context, task *Task) error { return SkipRetry // return SkipRetry without wrapping }), - wait: 2 * time.Second, - wantRetry: []base.Z{}, - wantArchived: []*base.TaskMessage{ - h.TaskMessageWithError(*m1, SkipRetry.Error()), - h.TaskMessageWithError(*m2, SkipRetry.Error()), - }, + wait: 2 * time.Second, + wantErrMsg: SkipRetry.Error(), + wantRetry: []*base.TaskMessage{}, + wantArchived: []*base.TaskMessage{m1, m2}, wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error }, { - desc: "Should skip retry errored tasks (with error wrapping)", - pending: []*base.TaskMessage{m1, m2}, - incoming: []*base.TaskMessage{}, - delay: time.Minute, + desc: "Should skip retry errored tasks (with error wrapping)", + pending: []*base.TaskMessage{m1, m2}, + delay: time.Minute, handler: HandlerFunc(func(ctx context.Context, task *Task) error { return wrappedSkipRetry }), - wait: 2 * time.Second, - wantRetry: []base.Z{}, - wantArchived: []*base.TaskMessage{ - h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()), - h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()), - }, + wait: 2 * time.Second, + wantErrMsg: wrappedSkipRetry.Error(), + wantRetry: []*base.TaskMessage{}, + wantArchived: []*base.TaskMessage{m1, m2}, wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error }, } @@ -415,24 +408,34 @@ func TestProcessorRetry(t *testing.T) { p.handler = tc.handler p.start(&sync.WaitGroup{}) - for _, msg := range tc.incoming { - err := rdbClient.Enqueue(msg) - if err != nil { - p.shutdown() - t.Fatal(err) - } - } + now := time.Now() // time when processor is running time.Sleep(tc.wait) // FIXME: This makes test flaky. p.shutdown() - cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score + cmpOpt := h.EquateInt64Approx(int64(tc.wait.Seconds())) // allow up to a wait-second difference in zset score gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { + var wantRetry []base.Z // Note: construct wantRetry here since `LastFailedAt` and ZSCORE is relative to each test run. + for _, msg := range tc.wantRetry { + wantRetry = append(wantRetry, + base.Z{ + Message: h.TaskMessageAfterRetry(*msg, tc.wantErrMsg), + Score: now.Add(tc.delay).Unix(), + }) + } + if diff := cmp.Diff(wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff) } - gotDead := h.GetArchivedMessages(t, r, base.DefaultQueueName) - if diff := cmp.Diff(tc.wantArchived, gotDead, h.SortMsgOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r, base.DefaultQueueName) + var wantArchived []base.Z // Note: construct wantArchived here since `LastFailedAt` and ZSCORE is relative to each test run. + for _, msg := range tc.wantArchived { + wantArchived = append(wantArchived, + base.Z{ + Message: h.TaskMessageWithError(*msg, tc.wantErrMsg), + Score: now.Unix(), + }) + } + if diff := cmp.Diff(wantArchived, gotArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.ArchivedKey(base.DefaultQueueName), diff) }