diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 9dab962..9fc527e 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -123,18 +123,18 @@ func JSON(kv map[string]interface{}) []byte { } // TaskMessageAfterRetry returns an updated copy of t after retry. -// It increments retry count and sets the error message. -func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage { +// It increments retry count and sets the error message and last_failed_at time. +func TaskMessageAfterRetry(t base.TaskMessage, errMsg string, failedAt time.Time) *base.TaskMessage { t.Retried = t.Retried + 1 t.ErrorMsg = errMsg - t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff + t.LastFailedAt = failedAt.Unix() return &t } // TaskMessageWithError returns an updated copy of t with the given error message. -func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage { +func TaskMessageWithError(t base.TaskMessage, errMsg string, failedAt time.Time) *base.TaskMessage { t.ErrorMsg = errMsg - t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff + t.LastFailedAt = failedAt.Unix() return &t } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7936bf5..fc6508a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1078,7 +1078,7 @@ func TestRetry(t *testing.T) { errMsg string wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z - wantRetry map[string][]base.Z + getWantRetry func(failedAt time.Time) map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ @@ -1099,11 +1099,13 @@ func TestRetry(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - wantRetry: map[string][]base.Z{ - "default": { - {Message: h.TaskMessageAfterRetry(*t1, errMsg), Score: now.Add(5 * time.Minute).Unix()}, - {Message: t3, Score: now.Add(time.Minute).Unix()}, - }, + getWantRetry: func(failedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": { + {Message: h.TaskMessageAfterRetry(*t1, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, + {Message: t3, Score: now.Add(time.Minute).Unix()}, + }, + } }, }, { @@ -1130,11 +1132,13 @@ func TestRetry(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, "custom": {}, }, - wantRetry: map[string][]base.Z{ - "default": {}, - "custom": { - {Message: h.TaskMessageAfterRetry(*t4, errMsg), Score: now.Add(5 * time.Minute).Unix()}, - }, + getWantRetry: func(failedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": {}, + "custom": { + {Message: h.TaskMessageAfterRetry(*t4, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, + }, + } }, }, } @@ -1145,6 +1149,7 @@ func TestRetry(t *testing.T) { h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllRetryQueues(t, r.client, tc.retry) + callTime := time.Now() // time when method was called err := r.Retry(tc.msg, tc.processAt, tc.errMsg) if err != nil { t.Errorf("(*RDB).Retry = %v, want nil", err) @@ -1167,7 +1172,8 @@ func TestRetry(t *testing.T) { h.SortZSetEntryOpt, cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field } - for queue, want := range tc.wantRetry { + wantRetry := tc.getWantRetry(callTime) + for queue, want := range wantRetry { gotRetry := h.GetRetryEntries(t, r.client, queue) if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) @@ -1244,13 +1250,13 @@ func TestArchive(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - active map[string][]*base.TaskMessage - deadlines map[string][]base.Z - archived map[string][]base.Z - target *base.TaskMessage // task to archive - wantActive map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantArchived map[string][]base.Z + active map[string][]*base.TaskMessage + deadlines map[string][]base.Z + archived map[string][]base.Z + target *base.TaskMessage // task to archive + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + getWantArchived func(failedAt time.Time) map[string][]base.Z }{ { active: map[string][]*base.TaskMessage{ @@ -1274,11 +1280,13 @@ func TestArchive(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - wantArchived: map[string][]base.Z{ - "default": { - {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, - {Message: t3, Score: now.Add(-time.Hour).Unix()}, - }, + getWantArchived: func(failedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, + {Message: t3, Score: now.Add(-time.Hour).Unix()}, + }, + } }, }, { @@ -1305,10 +1313,12 @@ func TestArchive(t *testing.T) { {Message: t3, Score: t3Deadline}, }, }, - wantArchived: map[string][]base.Z{ - "default": { - {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, - }, + getWantArchived: func(failedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, + }, + } }, }, { @@ -1337,11 +1347,13 @@ func TestArchive(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}}, "custom": {}, }, - wantArchived: map[string][]base.Z{ - "default": {}, - "custom": { - {Message: h.TaskMessageWithError(*t4, errMsg), Score: now.Unix()}, - }, + getWantArchived: func(failedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": {}, + "custom": { + {Message: h.TaskMessageWithError(*t4, errMsg, failedAt), Score: failedAt.Unix()}, + }, + } }, }, } @@ -1352,6 +1364,7 @@ func TestArchive(t *testing.T) { h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllArchivedQueues(t, r.client, tc.archived) + callTime := time.Now() // record time `Archive` was called err := r.Archive(tc.target, errMsg) if err != nil { t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err) @@ -1370,7 +1383,7 @@ func TestArchive(t *testing.T) { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) } } - for queue, want := range tc.wantArchived { + for queue, want := range tc.getWantArchived(callTime) { gotArchived := h.GetArchivedEntries(t, r.client, queue) if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) diff --git a/processor_test.go b/processor_test.go index 9aababd..008eecf 100644 --- a/processor_test.go +++ b/processor_test.go @@ -408,8 +408,8 @@ func TestProcessorRetry(t *testing.T) { p.handler = tc.handler p.start(&sync.WaitGroup{}) - now := time.Now() // time when processor is running - time.Sleep(tc.wait) // FIXME: This makes test flaky. + runTime := time.Now() // time when processor is running + time.Sleep(tc.wait) // FIXME: This makes test flaky. p.shutdown() cmpOpt := h.EquateInt64Approx(int64(tc.wait.Seconds())) // allow up to a wait-second difference in zset score @@ -418,8 +418,8 @@ func TestProcessorRetry(t *testing.T) { for _, msg := range tc.wantRetry { wantRetry = append(wantRetry, base.Z{ - Message: h.TaskMessageAfterRetry(*msg, tc.wantErrMsg), - Score: now.Add(tc.delay).Unix(), + Message: h.TaskMessageAfterRetry(*msg, tc.wantErrMsg, runTime), + Score: runTime.Add(tc.delay).Unix(), }) } if diff := cmp.Diff(wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { @@ -431,8 +431,8 @@ func TestProcessorRetry(t *testing.T) { for _, msg := range tc.wantArchived { wantArchived = append(wantArchived, base.Z{ - Message: h.TaskMessageWithError(*msg, tc.wantErrMsg), - Score: now.Unix(), + Message: h.TaskMessageWithError(*msg, tc.wantErrMsg, runTime), + Score: runTime.Unix(), }) } if diff := cmp.Diff(wantArchived, gotArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" { diff --git a/recoverer_test.go b/recoverer_test.go index 91afde4..0a9fdb6 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -64,7 +64,7 @@ func TestRecoverer(t *testing.T) { "default": {}, }, wantRetry: map[string][]*base.TaskMessage{ - "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, + "default": {t1}, }, wantArchived: map[string][]*base.TaskMessage{ "default": {}, @@ -101,7 +101,7 @@ func TestRecoverer(t *testing.T) { "critical": {}, }, wantArchived: map[string][]*base.TaskMessage{ - "default": {h.TaskMessageWithError(*t4, "deadline exceeded")}, + "default": {t4}, "critical": {}, }, }, @@ -137,7 +137,7 @@ func TestRecoverer(t *testing.T) { "critical": {{Message: t3, Score: oneHourFromNow.Unix()}}, }, wantRetry: map[string][]*base.TaskMessage{ - "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, + "default": {t1}, "critical": {}, }, wantArchived: map[string][]*base.TaskMessage{ @@ -176,8 +176,8 @@ func TestRecoverer(t *testing.T) { "default": {{Message: t2, Score: oneHourFromNow.Unix()}}, }, wantRetry: map[string][]*base.TaskMessage{ - "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, - "critical": {h.TaskMessageAfterRetry(*t3, "deadline exceeded")}, + "default": {t1}, + "critical": {t3}, }, wantArchived: map[string][]*base.TaskMessage{ "default": {}, @@ -238,6 +238,7 @@ func TestRecoverer(t *testing.T) { var wg sync.WaitGroup recoverer.start(&wg) + runTime := time.Now() // time when recoverer is running time.Sleep(2 * time.Second) recoverer.shutdown() @@ -253,15 +254,24 @@ func TestRecoverer(t *testing.T) { t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.DeadlinesKey(qname), diff) } } - for qname, want := range tc.wantRetry { + cmpOpt := h.EquateInt64Approx(2) // allow up to two-second difference in `LastFailedAt` + for qname, msgs := range tc.wantRetry { gotRetry := h.GetRetryMessages(t, r, qname) - if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + var wantRetry []*base.TaskMessage // Note: construct message here since `LastFailedAt` is relative to each test run + for _, msg := range msgs { + wantRetry = append(wantRetry, h.TaskMessageAfterRetry(*msg, "deadline exceeded", runTime)) + } + if diff := cmp.Diff(wantRetry, gotRetry, h.SortMsgOpt, cmpOpt); diff != "" { t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff) } } - for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedMessages(t, r, qname) - if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + for qname, msgs := range tc.wantArchived { + gotArchived := h.GetArchivedMessages(t, r, qname) + var wantArchived []*base.TaskMessage + for _, msg := range msgs { + wantArchived = append(wantArchived, h.TaskMessageWithError(*msg, "deadline exceeded", runTime)) + } + if diff := cmp.Diff(wantArchived, gotArchived, h.SortMsgOpt, cmpOpt); diff != "" { t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff) } }