From 97316d6766aeb9781bd46e215db3e818fd654132 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 11 Jan 2020 10:02:13 -0800 Subject: [PATCH] Fix flaky tests Some tests were failing due to mismatch in Score in ZSetEntry. Changed ZSetEntry Score to float64 type so that we can use cmpopts.EquateApprox to allow for margin when comparing. --- client_test.go | 2 +- internal/asynqtest/asynqtest.go | 4 +- internal/rdb/inspect_test.go | 232 ++++++++++++++++---------------- internal/rdb/rdb_test.go | 42 +++--- processor_test.go | 10 +- scheduler_test.go | 12 +- 6 files changed, 152 insertions(+), 150 deletions(-) diff --git a/client_test.go b/client_test.go index eb51b67..73c9190 100644 --- a/client_test.go +++ b/client_test.go @@ -58,7 +58,7 @@ func TestClient(t *testing.T) { Retry: defaultMaxRetry, Queue: "default", }, - Score: time.Now().Add(2 * time.Hour).Unix(), + Score: float64(time.Now().Add(2 * time.Hour).Unix()), }, }, }, diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 64f2446..5bbed3a 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -20,7 +20,7 @@ import ( // ZSetEntry is an entry in redis sorted set. type ZSetEntry struct { Msg *base.TaskMessage - Score int64 + Score float64 } // SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages. @@ -245,7 +245,7 @@ func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry { for _, z := range data { entries = append(entries, ZSetEntry{ Msg: MustUnmarshal(tb, z.Member.(string)), - Score: int64(z.Score), + Score: z.Score, }) } return entries diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 53cf84d..278b608 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -47,8 +47,8 @@ func TestCurrentStats(t *testing.T) { }, inProgress: []*base.TaskMessage{m2}, scheduled: []h.ZSetEntry{ - {Msg: m3, Score: now.Add(time.Hour).Unix()}, - {Msg: m4, Score: now.Unix()}}, + {Msg: m3, Score: float64(now.Add(time.Hour).Unix())}, + {Msg: m4, Score: float64(now.Unix())}}, retry: []h.ZSetEntry{}, dead: []h.ZSetEntry{}, processed: 120, @@ -72,12 +72,12 @@ func TestCurrentStats(t *testing.T) { }, inProgress: []*base.TaskMessage{}, scheduled: []h.ZSetEntry{ - {Msg: m3, Score: now.Unix()}, - {Msg: m4, Score: now.Unix()}}, + {Msg: m3, Score: float64(now.Unix())}, + {Msg: m4, Score: float64(now.Unix())}}, retry: []h.ZSetEntry{ - {Msg: m1, Score: now.Add(time.Minute).Unix()}}, + {Msg: m1, Score: float64(now.Add(time.Minute).Unix())}}, dead: []h.ZSetEntry{ - {Msg: m2, Score: now.Add(-time.Hour).Unix()}}, + {Msg: m2, Score: float64(now.Add(-time.Hour).Unix())}}, processed: 90, failed: 10, allQueues: []interface{}{base.DefaultQueue}, @@ -364,8 +364,8 @@ func TestListScheduled(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: p1.Unix()}, - {Msg: m2, Score: p2.Unix()}, + {Msg: m1, Score: float64(p1.Unix())}, + {Msg: m2, Score: float64(p2.Unix())}, }, want: []*ScheduledTask{t1, t2}, }, @@ -449,8 +449,8 @@ func TestListRetry(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: m1, Score: p1.Unix()}, - {Msg: m2, Score: p2.Unix()}, + {Msg: m1, Score: float64(p1.Unix())}, + {Msg: m2, Score: float64(p2.Unix())}, }, want: []*RetryTask{t1, t2}, }, @@ -527,8 +527,8 @@ func TestListDead(t *testing.T) { }{ { dead: []h.ZSetEntry{ - {Msg: m1, Score: f1.Unix()}, - {Msg: m2, Score: f2.Unix()}, + {Msg: m1, Score: float64(f1.Unix())}, + {Msg: m2, Score: float64(f2.Unix())}, }, want: []*DeadTask{t1, t2}, }, @@ -582,8 +582,8 @@ func TestEnqueueDeadTask(t *testing.T) { }{ { dead: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: s2, id: t2.ID, @@ -595,8 +595,8 @@ func TestEnqueueDeadTask(t *testing.T) { }, { dead: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: 123, id: t2.ID, @@ -608,9 +608,9 @@ func TestEnqueueDeadTask(t *testing.T) { }, { dead: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, - {Msg: t3, Score: s1}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, + {Msg: t3, Score: float64(s1)}, }, score: s1, id: t3.ID, @@ -666,8 +666,8 @@ func TestEnqueueRetryTask(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: s2, id: t2.ID, @@ -679,8 +679,8 @@ func TestEnqueueRetryTask(t *testing.T) { }, { retry: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: 123, id: t2.ID, @@ -692,9 +692,9 @@ func TestEnqueueRetryTask(t *testing.T) { }, { retry: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, - {Msg: t3, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, + {Msg: t3, Score: float64(s2)}, }, score: s2, id: t3.ID, @@ -750,8 +750,8 @@ func TestEnqueueScheduledTask(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: s2, id: t2.ID, @@ -763,8 +763,8 @@ func TestEnqueueScheduledTask(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, }, score: 123, id: t2.ID, @@ -776,9 +776,9 @@ func TestEnqueueScheduledTask(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: s1}, - {Msg: t2, Score: s2}, - {Msg: t3, Score: s1}, + {Msg: t1, Score: float64(s1)}, + {Msg: t2, Score: float64(s2)}, + {Msg: t3, Score: float64(s1)}, }, score: s1, id: t3.ID, @@ -834,9 +834,9 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { { desc: "with tasks in scheduled queue", scheduled: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t2, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t3, Score: time.Now().Add(time.Hour).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(time.Hour).Unix())}, }, want: 3, wantEnqueued: map[string][]*base.TaskMessage{ @@ -854,11 +854,11 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { { desc: "with custom queues", scheduled: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t2, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t3, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t4, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t5, Score: time.Now().Add(time.Hour).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t4, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t5, Score: float64(time.Now().Add(time.Hour).Unix())}, }, want: 5, wantEnqueued: map[string][]*base.TaskMessage{ @@ -913,9 +913,9 @@ func TestEnqueueAllRetryTasks(t *testing.T) { { desc: "with tasks in retry queue", retry: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t2, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t3, Score: time.Now().Add(time.Hour).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(time.Hour).Unix())}, }, want: 3, wantEnqueued: map[string][]*base.TaskMessage{ @@ -933,11 +933,11 @@ func TestEnqueueAllRetryTasks(t *testing.T) { { desc: "with custom queues", retry: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t2, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t3, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t4, Score: time.Now().Add(time.Hour).Unix()}, - {Msg: t5, Score: time.Now().Add(time.Hour).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t4, Score: float64(time.Now().Add(time.Hour).Unix())}, + {Msg: t5, Score: float64(time.Now().Add(time.Hour).Unix())}, }, want: 5, wantEnqueued: map[string][]*base.TaskMessage{ @@ -992,9 +992,9 @@ func TestEnqueueAllDeadTasks(t *testing.T) { { desc: "with tasks in dead queue", dead: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t2, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t3, Score: time.Now().Add(-time.Minute).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(-time.Minute).Unix())}, }, want: 3, wantEnqueued: map[string][]*base.TaskMessage{ @@ -1012,11 +1012,11 @@ func TestEnqueueAllDeadTasks(t *testing.T) { { desc: "with custom queues", dead: []h.ZSetEntry{ - {Msg: t1, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t2, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t3, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t4, Score: time.Now().Add(-time.Minute).Unix()}, - {Msg: t5, Score: time.Now().Add(-time.Minute).Unix()}, + {Msg: t1, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t2, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t3, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t4, Score: float64(time.Now().Add(-time.Minute).Unix())}, + {Msg: t5, Score: float64(time.Now().Add(-time.Minute).Unix())}, }, want: 5, wantEnqueued: map[string][]*base.TaskMessage{ @@ -1070,35 +1070,35 @@ func TestKillRetryTask(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, dead: []h.ZSetEntry{}, id: m1.ID, score: t1.Unix(), want: nil, wantRetry: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, }, }, { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, dead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, wantRetry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, wantDead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, } @@ -1147,35 +1147,35 @@ func TestKillScheduledTask(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, dead: []h.ZSetEntry{}, id: m1.ID, score: t1.Unix(), want: nil, wantScheduled: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, }, }, { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, dead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, wantScheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, wantDead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, } @@ -1222,42 +1222,42 @@ func TestKillAllRetryTasks(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, dead: []h.ZSetEntry{}, want: 2, wantRetry: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(time.Now().Unix())}, }, }, { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, dead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, want: 1, wantRetry: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, { retry: []h.ZSetEntry{}, dead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, want: 0, wantRetry: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, } @@ -1304,42 +1304,42 @@ func TestKillAllScheduledTasks(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, dead: []h.ZSetEntry{}, want: 2, wantScheduled: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(time.Now().Unix())}, }, }, { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, dead: []h.ZSetEntry{ - {Msg: m2, Score: t2.Unix()}, + {Msg: m2, Score: float64(t2.Unix())}, }, want: 1, wantScheduled: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, { scheduled: []h.ZSetEntry{}, dead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, want: 0, wantScheduled: []h.ZSetEntry{}, wantDead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, }, } @@ -1386,8 +1386,8 @@ func TestDeleteDeadTask(t *testing.T) { }{ { dead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m1.ID, score: t1.Unix(), @@ -1396,8 +1396,8 @@ func TestDeleteDeadTask(t *testing.T) { }, { dead: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m1.ID, score: t2.Unix(), // id and score mismatch @@ -1446,8 +1446,8 @@ func TestDeleteRetryTask(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m1.ID, score: t1.Unix(), @@ -1456,7 +1456,7 @@ func TestDeleteRetryTask(t *testing.T) { }, { retry: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, id: m2.ID, score: t2.Unix(), @@ -1498,8 +1498,8 @@ func TestDeleteScheduledTask(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, - {Msg: m2, Score: t2.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, + {Msg: m2, Score: float64(t2.Unix())}, }, id: m1.ID, score: t1.Unix(), @@ -1508,7 +1508,7 @@ func TestDeleteScheduledTask(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: t1.Unix()}, + {Msg: m1, Score: float64(t1.Unix())}, }, id: m2.ID, score: t2.Unix(), @@ -1546,9 +1546,9 @@ func TestDeleteAllDeadTasks(t *testing.T) { }{ { dead: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: time.Now().Unix()}, - {Msg: m3, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(time.Now().Unix())}, + {Msg: m3, Score: float64(time.Now().Unix())}, }, wantDead: []*base.TaskMessage{}, }, @@ -1582,9 +1582,9 @@ func TestDeleteAllRetryTasks(t *testing.T) { }{ { retry: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Unix()}, - {Msg: m2, Score: time.Now().Unix()}, - {Msg: m3, Score: time.Now().Unix()}, + {Msg: m1, Score: float64(time.Now().Unix())}, + {Msg: m2, Score: float64(time.Now().Unix())}, + {Msg: m3, Score: float64(time.Now().Unix())}, }, wantRetry: []*base.TaskMessage{}, }, @@ -1618,9 +1618,9 @@ func TestDeleteAllScheduledTasks(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: m1, Score: time.Now().Add(time.Minute).Unix()}, - {Msg: m2, Score: time.Now().Add(time.Minute).Unix()}, - {Msg: m3, Score: time.Now().Add(time.Minute).Unix()}, + {Msg: m1, Score: float64(time.Now().Add(time.Minute).Unix())}, + {Msg: m2, Score: float64(time.Now().Add(time.Minute).Unix())}, + {Msg: m3, Score: float64(time.Now().Add(time.Minute).Unix())}, }, wantScheduled: []*base.TaskMessage{}, }, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 4e34785..7ed0284 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -308,8 +308,8 @@ func TestSchedule(t *testing.T) { t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) continue } - if gotScheduled[0].Score != tc.processAt.Unix() { - t.Errorf("%s inserted an item with score %d, want %d", desc, gotScheduled[0].Score, tc.processAt.Unix()) + if int64(gotScheduled[0].Score) != tc.processAt.Unix() { + t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) continue } } @@ -347,7 +347,7 @@ func TestRetry(t *testing.T) { retry: []h.ZSetEntry{ { Msg: t3, - Score: now.Add(time.Minute).Unix(), + Score: float64(now.Add(time.Minute).Unix()), }, }, msg: t1, @@ -357,11 +357,11 @@ func TestRetry(t *testing.T) { wantRetry: []h.ZSetEntry{ { Msg: t1AfterRetry, - Score: now.Add(5 * time.Minute).Unix(), + Score: float64(now.Add(5 * time.Minute).Unix()), }, { Msg: t3, - Score: now.Add(time.Minute).Unix(), + Score: float64(now.Add(time.Minute).Unix()), }, }, }, @@ -440,7 +440,7 @@ func TestKill(t *testing.T) { dead: []h.ZSetEntry{ { Msg: t3, - Score: now.Add(-time.Hour).Unix(), + Score: float64(now.Add(-time.Hour).Unix()), }, }, target: t1, @@ -448,11 +448,11 @@ func TestKill(t *testing.T) { wantDead: []h.ZSetEntry{ { Msg: t1AfterKill, - Score: now.Unix(), + Score: float64(now.Unix()), }, { Msg: t3, - Score: now.Add(-time.Hour).Unix(), + Score: float64(now.Add(-time.Hour).Unix()), }, }, }, @@ -464,7 +464,7 @@ func TestKill(t *testing.T) { wantDead: []h.ZSetEntry{ { Msg: t1AfterKill, - Score: now.Unix(), + Score: float64(now.Unix()), }, }, }, @@ -593,11 +593,11 @@ func TestCheckAndEnqueue(t *testing.T) { }{ { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: secondAgo.Unix()}, - {Msg: t2, Score: secondAgo.Unix()}, + {Msg: t1, Score: float64(secondAgo.Unix())}, + {Msg: t2, Score: float64(secondAgo.Unix())}, }, retry: []h.ZSetEntry{ - {Msg: t3, Score: secondAgo.Unix()}}, + {Msg: t3, Score: float64(secondAgo.Unix())}}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, @@ -606,10 +606,10 @@ func TestCheckAndEnqueue(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: hourFromNow.Unix()}, - {Msg: t2, Score: secondAgo.Unix()}}, + {Msg: t1, Score: float64(hourFromNow.Unix())}, + {Msg: t2, Score: float64(secondAgo.Unix())}}, retry: []h.ZSetEntry{ - {Msg: t3, Score: secondAgo.Unix()}}, + {Msg: t3, Score: float64(secondAgo.Unix())}}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, @@ -618,10 +618,10 @@ func TestCheckAndEnqueue(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: hourFromNow.Unix()}, - {Msg: t2, Score: hourFromNow.Unix()}}, + {Msg: t1, Score: float64(hourFromNow.Unix())}, + {Msg: t2, Score: float64(hourFromNow.Unix())}}, retry: []h.ZSetEntry{ - {Msg: t3, Score: hourFromNow.Unix()}}, + {Msg: t3, Score: float64(hourFromNow.Unix())}}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, @@ -630,11 +630,11 @@ func TestCheckAndEnqueue(t *testing.T) { }, { scheduled: []h.ZSetEntry{ - {Msg: t1, Score: secondAgo.Unix()}, - {Msg: t4, Score: secondAgo.Unix()}, + {Msg: t1, Score: float64(secondAgo.Unix())}, + {Msg: t4, Score: float64(secondAgo.Unix())}, }, retry: []h.ZSetEntry{ - {Msg: t5, Score: secondAgo.Unix()}}, + {Msg: t5, Score: float64(secondAgo.Unix())}}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t4}, diff --git a/processor_test.go b/processor_test.go index cca97ea..bcafbc6 100644 --- a/processor_test.go +++ b/processor_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" @@ -128,9 +129,9 @@ func TestProcessorRetry(t *testing.T) { delay: time.Minute, wait: time.Second, wantRetry: []h.ZSetEntry{ - {Msg: &r2, Score: now.Add(time.Minute).Unix()}, - {Msg: &r3, Score: now.Add(time.Minute).Unix()}, - {Msg: &r4, Score: now.Add(time.Minute).Unix()}, + {Msg: &r2, Score: float64(now.Add(time.Minute).Unix())}, + {Msg: &r3, Score: float64(now.Add(time.Minute).Unix())}, + {Msg: &r4, Score: float64(now.Add(time.Minute).Unix())}, }, wantDead: []*base.TaskMessage{&r1}, }, @@ -161,8 +162,9 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() + cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score gotRetry := h.GetRetryEntries(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) } diff --git a/scheduler_test.go b/scheduler_test.go index 6eadf0d..45dd33f 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -36,11 +36,11 @@ func TestScheduler(t *testing.T) { }{ { initScheduled: []h.ZSetEntry{ - {Msg: t1, Score: now.Add(time.Hour).Unix()}, - {Msg: t2, Score: now.Add(-2 * time.Second).Unix()}, + {Msg: t1, Score: float64(now.Add(time.Hour).Unix())}, + {Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())}, }, initRetry: []h.ZSetEntry{ - {Msg: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}, + {Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())}, }, initQueue: []*base.TaskMessage{t4}, wait: pollInterval * 2, @@ -50,9 +50,9 @@ func TestScheduler(t *testing.T) { }, { initScheduled: []h.ZSetEntry{ - {Msg: t1, Score: now.Unix()}, - {Msg: t2, Score: now.Add(-2 * time.Second).Unix()}, - {Msg: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, + {Msg: t1, Score: float64(now.Unix())}, + {Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())}, + {Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())}, }, initRetry: []h.ZSetEntry{}, initQueue: []*base.TaskMessage{t4},