From aa26f3819ede4887846e235733b92f4e685c2af2 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 5 Jan 2022 09:07:42 -0800 Subject: [PATCH] Fix flaky tests --- inspector_test.go | 25 +++---- internal/rdb/inspect.go | 22 +++--- internal/rdb/inspect_test.go | 69 ++++++++++-------- internal/rdb/rdb_test.go | 136 +++++++++++++++-------------------- 4 files changed, 117 insertions(+), 135 deletions(-) diff --git a/inspector_test.go b/inspector_test.go index 89f21b3..f826a6a 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "math" "sort" "testing" "time" @@ -1523,6 +1522,7 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1614,12 +1614,8 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { } } for qname, want := range tc.wantArchived { - // Allow Z.Score to differ by up to 2. - approxOpt := cmp.Comparer(func(a, b int64) bool { - return math.Abs(float64(a-b)) < 2 - }) gotArchived := h.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1640,6 +1636,7 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { scheduled map[string][]base.Z @@ -1747,12 +1744,8 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { } } for qname, want := range tc.wantArchived { - // Allow Z.Score to differ by up to 2. - approxOpt := cmp.Comparer(func(a, b int64) bool { - return math.Abs(float64(a-b)) < 2 - }) gotArchived := h.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1773,6 +1766,7 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { retry map[string][]base.Z @@ -1863,10 +1857,9 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } - cmpOpt := h.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score for qname, want := range tc.wantArchived { wantArchived := h.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" { + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -2814,8 +2807,9 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := NewInspector(getRedisConnOpt(t)) now := time.Now() + inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { pending map[string][]*base.TaskMessage @@ -2910,6 +2904,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { scheduled map[string][]base.Z @@ -2986,6 +2981,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { retry map[string][]base.Z @@ -3060,6 +3056,7 @@ func TestInspectorArchiveTaskError(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { retry map[string][]base.Z diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 51db716..5f70e61 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -130,7 +130,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - now := time.Now() + now := r.clock.Now() res, err := currentStatsCmd.Run(context.Background(), r.client, []string{ base.PendingKey(qname), base.ActiveKey(qname), @@ -316,7 +316,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } const day = 24 * time.Hour - now := time.Now().UTC() + now := r.clock.Now().UTC() var days []time.Time var keys []string for i := 0; i < n; i++ { @@ -433,7 +433,7 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { keys := []string{base.TaskKey(qname, id)} argv := []interface{}{ id, - time.Now().Unix(), + r.clock.Now().Unix(), base.QueueKeyPrefix(qname), } res, err := getTaskInfoCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -594,7 +594,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ( } var nextProcessAt time.Time if state == base.TaskStatePending { - nextProcessAt = time.Now() + nextProcessAt = r.clock.Now() } infos = append(infos, &base.TaskInfo{ Message: m, @@ -999,7 +999,7 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { base.PendingKey(qname), base.ArchivedKey(qname), } - now := time.Now() + now := r.clock.Now() argv := []interface{}{ now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), @@ -1079,7 +1079,7 @@ func (r *RDB) ArchiveTask(qname, id string) error { base.TaskKey(qname, id), base.ArchivedKey(qname), } - now := time.Now() + now := r.clock.Now() argv := []interface{}{ id, now.Unix(), @@ -1144,7 +1144,7 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { src, dst, } - now := time.Now() + now := r.clock.Now() argv := []interface{}{ now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), @@ -1550,7 +1550,7 @@ return keys`) // ListServers returns the list of server info. func (r *RDB) ListServers() ([]*base.ServerInfo, error) { - now := time.Now() + now := r.clock.Now() res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers}, now.Unix()).Result() if err != nil { return nil, err @@ -1584,7 +1584,7 @@ return keys`) // ListWorkers returns the list of worker stats. func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { var op errors.Op = "rdb.ListWorkers" - now := time.Now() + now := r.clock.Now() res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers}, now.Unix()).Result() if err != nil { return nil, errors.E(op, errors.Unknown, err) @@ -1619,7 +1619,7 @@ return keys`) // ListSchedulerEntries returns the list of scheduler entries. func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { - now := time.Now() + now := r.clock.Now() res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers}, now.Unix()).Result() if err != nil { return nil, err @@ -1670,7 +1670,7 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas // Pause pauses processing of tasks from the given queue. func (r *RDB) Pause(qname string) error { key := base.PausedKey(qname) - ok, err := r.client.SetNX(context.Background(), key, time.Now().Unix(), 0).Result() + ok, err := r.client.SetNX(context.Background(), key, r.clock.Now().Unix(), 0).Result() if err != nil { return err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index ef07abb..ca9e2cf 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2665,8 +2665,11 @@ func TestArchiveAllPendingTasks(t *testing.T) { m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") - t1 := time.Now().Add(1 * time.Minute) - t2 := time.Now().Add(1 * time.Hour) + now := time.Now() + t1 := now.Add(1 * time.Minute) + t2 := now.Add(1 * time.Hour) + + r.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { pending map[string][]*base.TaskMessage @@ -2690,8 +2693,8 @@ func TestArchiveAllPendingTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, + {Message: m2, Score: now.Unix()}, }, }, }, @@ -2709,7 +2712,7 @@ func TestArchiveAllPendingTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, {Message: m2, Score: t2.Unix()}, }, }, @@ -2754,8 +2757,8 @@ func TestArchiveAllPendingTasks(t *testing.T) { wantArchived: map[string][]base.Z{ "default": {}, "custom": { - {Message: m3, Score: time.Now().Unix()}, - {Message: m4, Score: time.Now().Unix()}, + {Message: m3, Score: now.Unix()}, + {Message: m4, Score: now.Unix()}, }, }, }, @@ -2783,7 +2786,7 @@ func TestArchiveAllPendingTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2797,10 +2800,13 @@ func TestArchiveAllRetryTasks(t *testing.T) { m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") - t1 := time.Now().Add(1 * time.Minute) - t2 := time.Now().Add(1 * time.Hour) - t3 := time.Now().Add(2 * time.Hour) - t4 := time.Now().Add(3 * time.Hour) + now := time.Now() + t1 := now.Add(1 * time.Minute) + t2 := now.Add(1 * time.Hour) + t3 := now.Add(2 * time.Hour) + t4 := now.Add(3 * time.Hour) + + r.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { retry map[string][]base.Z @@ -2827,8 +2833,8 @@ func TestArchiveAllRetryTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, + {Message: m2, Score: now.Unix()}, }, }, }, @@ -2846,7 +2852,7 @@ func TestArchiveAllRetryTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, {Message: m2, Score: t2.Unix()}, }, }, @@ -2900,8 +2906,8 @@ func TestArchiveAllRetryTasks(t *testing.T) { wantArchived: map[string][]base.Z{ "default": {}, "custom": { - {Message: m3, Score: time.Now().Unix()}, - {Message: m4, Score: time.Now().Unix()}, + {Message: m3, Score: now.Unix()}, + {Message: m4, Score: now.Unix()}, }, }, }, @@ -2921,7 +2927,7 @@ func TestArchiveAllRetryTasks(t *testing.T) { for qname, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryKey(qname), diff) } @@ -2929,7 +2935,7 @@ func TestArchiveAllRetryTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2944,10 +2950,13 @@ func TestArchiveAllScheduledTasks(t *testing.T) { m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") - t1 := time.Now().Add(time.Minute) - t2 := time.Now().Add(time.Hour) - t3 := time.Now().Add(time.Hour) - t4 := time.Now().Add(time.Hour) + now := time.Now() + t1 := now.Add(time.Minute) + t2 := now.Add(time.Hour) + t3 := now.Add(time.Hour) + t4 := now.Add(time.Hour) + + r.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { scheduled map[string][]base.Z @@ -2974,8 +2983,8 @@ func TestArchiveAllScheduledTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, + {Message: m2, Score: now.Unix()}, }, }, }, @@ -2993,7 +3002,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) { }, wantArchived: map[string][]base.Z{ "default": { - {Message: m1, Score: time.Now().Unix()}, + {Message: m1, Score: now.Unix()}, {Message: m2, Score: t2.Unix()}, }, }, @@ -3047,8 +3056,8 @@ func TestArchiveAllScheduledTasks(t *testing.T) { wantArchived: map[string][]base.Z{ "default": {}, "custom": { - {Message: m3, Score: time.Now().Unix()}, - {Message: m4, Score: time.Now().Unix()}, + {Message: m3, Score: now.Unix()}, + {Message: m4, Score: now.Unix()}, }, }, }, @@ -3068,7 +3077,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledKey(qname), diff) } @@ -3076,7 +3085,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7dd4b99..b08d54e 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -313,6 +313,7 @@ func TestDequeue(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) t1 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -432,7 +433,7 @@ func TestDequeue(t *testing.T) { tc.args, gotMsg, tc.wantMsg) continue } - if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { + if !cmp.Equal(gotDeadline, tc.wantDeadline) { t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, tc.wantDeadline) continue @@ -452,8 +453,7 @@ func TestDequeue(t *testing.T) { } for queue, want := range tc.wantDeadlines { gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) - cmpOpts := []cmp.Option{h.SortZSetEntryOpt, h.EquateInt64Approx(2)} // allow up to 2 second margin in Score - if diff := cmp.Diff(want, gotDeadlines, cmpOpts...); diff != "" { + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) } } @@ -1451,6 +1451,7 @@ func TestRetry(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) t1 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -1494,7 +1495,7 @@ func TestRetry(t *testing.T) { errMsg string wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z - getWantRetry func(failedAt time.Time) map[string][]base.Z + wantRetry map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ @@ -1515,13 +1516,11 @@ func TestRetry(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - getWantRetry: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": { - {Message: h.TaskMessageAfterRetry(*t1, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, - {Message: t3, Score: now.Add(time.Minute).Unix()}, - }, - } + wantRetry: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageAfterRetry(*t1, errMsg, now), Score: now.Add(5 * time.Minute).Unix()}, + {Message: t3, Score: now.Add(time.Minute).Unix()}, + }, }, }, { @@ -1548,13 +1547,11 @@ func TestRetry(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, "custom": {}, }, - getWantRetry: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {}, - "custom": { - {Message: h.TaskMessageAfterRetry(*t4, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, - }, - } + wantRetry: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: h.TaskMessageAfterRetry(*t4, errMsg, now), Score: now.Add(5 * time.Minute).Unix()}, + }, }, }, } @@ -1565,7 +1562,6 @@ func TestRetry(t *testing.T) { h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllRetryQueues(t, r.client, tc.retry) - callTime := time.Now() // time when method was called err := r.Retry(tc.msg, tc.processAt, tc.errMsg, true /*isFailure*/) if err != nil { t.Errorf("(*RDB).Retry = %v, want nil", err) @@ -1584,14 +1580,9 @@ func TestRetry(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) } } - cmpOpts := []cmp.Option{ - h.SortZSetEntryOpt, - cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field - } - wantRetry := tc.getWantRetry(callTime) - for queue, want := range wantRetry { + for queue, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) } } @@ -1634,6 +1625,7 @@ func TestRetryWithNonFailureError(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) t1 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -1677,7 +1669,7 @@ func TestRetryWithNonFailureError(t *testing.T) { errMsg string wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z - getWantRetry func(failedAt time.Time) map[string][]base.Z + wantRetry map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ @@ -1698,14 +1690,12 @@ func TestRetryWithNonFailureError(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - getWantRetry: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": { - // Task message should include the error message but without incrementing the retry count. - {Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, - {Message: t3, Score: now.Add(time.Minute).Unix()}, - }, - } + wantRetry: map[string][]base.Z{ + "default": { + // Task message should include the error message but without incrementing the retry count. + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Add(5 * time.Minute).Unix()}, + {Message: t3, Score: now.Add(time.Minute).Unix()}, + }, }, }, { @@ -1732,14 +1722,12 @@ func TestRetryWithNonFailureError(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, "custom": {}, }, - getWantRetry: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {}, - "custom": { - // Task message should include the error message but without incrementing the retry count. - {Message: h.TaskMessageWithError(*t4, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, - }, - } + wantRetry: map[string][]base.Z{ + "default": {}, + "custom": { + // Task message should include the error message but without incrementing the retry count. + {Message: h.TaskMessageWithError(*t4, errMsg, now), Score: now.Add(5 * time.Minute).Unix()}, + }, }, }, } @@ -1750,7 +1738,6 @@ func TestRetryWithNonFailureError(t *testing.T) { h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllRetryQueues(t, r.client, tc.retry) - callTime := time.Now() // time when method was called err := r.Retry(tc.msg, tc.processAt, tc.errMsg, false /*isFailure*/) if err != nil { t.Errorf("(*RDB).Retry = %v, want nil", err) @@ -1769,14 +1756,9 @@ func TestRetryWithNonFailureError(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) } } - cmpOpts := []cmp.Option{ - h.SortZSetEntryOpt, - cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field - } - wantRetry := tc.getWantRetry(callTime) - for queue, want := range wantRetry { + for queue, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) } } @@ -1813,6 +1795,7 @@ func TestArchive(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) t1 := &base.TaskMessage{ ID: uuid.NewString(), Type: "send_email", @@ -1857,13 +1840,13 @@ func TestArchive(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - active map[string][]*base.TaskMessage - deadlines map[string][]base.Z - archived map[string][]base.Z - target *base.TaskMessage // task to archive - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - getWantArchived func(failedAt time.Time) map[string][]base.Z + active map[string][]*base.TaskMessage + deadlines map[string][]base.Z + archived map[string][]base.Z + target *base.TaskMessage // task to archive + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantArchived map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ @@ -1887,13 +1870,11 @@ func TestArchive(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - getWantArchived: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": { - {Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, - {Message: t3, Score: now.Add(-time.Hour).Unix()}, - }, - } + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + {Message: t3, Score: now.Add(-time.Hour).Unix()}, + }, }, }, { @@ -1920,12 +1901,10 @@ func TestArchive(t *testing.T) { {Message: t3, Score: t3Deadline}, }, }, - getWantArchived: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": { - {Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, - }, - } + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + }, }, }, { @@ -1954,13 +1933,11 @@ func TestArchive(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}}, "custom": {}, }, - getWantArchived: func(failedAt time.Time) map[string][]base.Z { - return map[string][]base.Z{ - "default": {}, - "custom": { - {Message: h.TaskMessageWithError(*t4, errMsg, failedAt), Score: failedAt.Unix()}, - }, - } + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: h.TaskMessageWithError(*t4, errMsg, now), Score: now.Unix()}, + }, }, }, } @@ -1971,7 +1948,6 @@ func TestArchive(t *testing.T) { h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllArchivedQueues(t, r.client, tc.archived) - callTime := time.Now() // record time `Archive` was called err := r.Archive(tc.target, errMsg) if err != nil { t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err) @@ -1990,7 +1966,7 @@ func TestArchive(t *testing.T) { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) } } - for queue, want := range tc.getWantArchived(callTime) { + for queue, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, queue) if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)