2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Fix flaky tests

This commit is contained in:
Ken Hibino 2022-01-05 09:07:42 -08:00 committed by GitHub
parent d94614bb9b
commit aa26f3819e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 135 deletions

View File

@ -8,7 +8,6 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math"
"sort" "sort"
"testing" "testing"
"time" "time"
@ -1523,6 +1522,7 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) {
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -1614,12 +1614,8 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) {
} }
} }
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
// Allow Z.Score to differ by up to 2.
approxOpt := cmp.Comparer(func(a, b int64) bool {
return math.Abs(float64(a-b)) < 2
})
gotArchived := h.GetArchivedEntries(t, r, qname) gotArchived := h.GetArchivedEntries(t, r, qname)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
@ -1640,6 +1636,7 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) {
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1747,12 +1744,8 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) {
} }
} }
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
// Allow Z.Score to differ by up to 2.
approxOpt := cmp.Comparer(func(a, b int64) bool {
return math.Abs(float64(a-b)) < 2
})
gotArchived := h.GetArchivedEntries(t, r, qname) gotArchived := h.GetArchivedEntries(t, r, qname)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
@ -1773,6 +1766,7 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) {
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -1863,10 +1857,9 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) {
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
cmpOpt := h.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
wantArchived := h.GetArchivedEntries(t, r, qname) wantArchived := h.GetArchivedEntries(t, r, qname)
if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" { if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
@ -2814,8 +2807,9 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") m2 := h.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := NewInspector(getRedisConnOpt(t))
now := time.Now() now := time.Now()
inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -2910,6 +2904,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -2986,6 +2981,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -3060,6 +3056,7 @@ func TestInspectorArchiveTaskError(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z

View File

@ -130,7 +130,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
if !exists { if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
now := time.Now() now := r.clock.Now()
res, err := currentStatsCmd.Run(context.Background(), r.client, []string{ res, err := currentStatsCmd.Run(context.Background(), r.client, []string{
base.PendingKey(qname), base.PendingKey(qname),
base.ActiveKey(qname), base.ActiveKey(qname),
@ -316,7 +316,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
const day = 24 * time.Hour const day = 24 * time.Hour
now := time.Now().UTC() now := r.clock.Now().UTC()
var days []time.Time var days []time.Time
var keys []string var keys []string
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
@ -433,7 +433,7 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
keys := []string{base.TaskKey(qname, id)} keys := []string{base.TaskKey(qname, id)}
argv := []interface{}{ argv := []interface{}{
id, id,
time.Now().Unix(), r.clock.Now().Unix(),
base.QueueKeyPrefix(qname), base.QueueKeyPrefix(qname),
} }
res, err := getTaskInfoCmd.Run(context.Background(), r.client, keys, argv...).Result() res, err := getTaskInfoCmd.Run(context.Background(), r.client, keys, argv...).Result()
@ -594,7 +594,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
} }
var nextProcessAt time.Time var nextProcessAt time.Time
if state == base.TaskStatePending { if state == base.TaskStatePending {
nextProcessAt = time.Now() nextProcessAt = r.clock.Now()
} }
infos = append(infos, &base.TaskInfo{ infos = append(infos, &base.TaskInfo{
Message: m, Message: m,
@ -999,7 +999,7 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
base.PendingKey(qname), base.PendingKey(qname),
base.ArchivedKey(qname), base.ArchivedKey(qname),
} }
now := time.Now() now := r.clock.Now()
argv := []interface{}{ argv := []interface{}{
now.Unix(), now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
@ -1079,7 +1079,7 @@ func (r *RDB) ArchiveTask(qname, id string) error {
base.TaskKey(qname, id), base.TaskKey(qname, id),
base.ArchivedKey(qname), base.ArchivedKey(qname),
} }
now := time.Now() now := r.clock.Now()
argv := []interface{}{ argv := []interface{}{
id, id,
now.Unix(), now.Unix(),
@ -1144,7 +1144,7 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
src, src,
dst, dst,
} }
now := time.Now() now := r.clock.Now()
argv := []interface{}{ argv := []interface{}{
now.Unix(), now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
@ -1550,7 +1550,7 @@ return keys`)
// ListServers returns the list of server info. // ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) { func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
now := time.Now() now := r.clock.Now()
res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers}, now.Unix()).Result() res, err := listServerKeysCmd.Run(context.Background(), r.client, []string{base.AllServers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -1584,7 +1584,7 @@ return keys`)
// ListWorkers returns the list of worker stats. // ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
var op errors.Op = "rdb.ListWorkers" var op errors.Op = "rdb.ListWorkers"
now := time.Now() now := r.clock.Now()
res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers}, now.Unix()).Result() res, err := listWorkersCmd.Run(context.Background(), r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, errors.E(op, errors.Unknown, err) return nil, errors.E(op, errors.Unknown, err)
@ -1619,7 +1619,7 @@ return keys`)
// ListSchedulerEntries returns the list of scheduler entries. // ListSchedulerEntries returns the list of scheduler entries.
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
now := time.Now() now := r.clock.Now()
res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers}, now.Unix()).Result() res, err := listSchedulerKeysCmd.Run(context.Background(), r.client, []string{base.AllSchedulers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -1670,7 +1670,7 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas
// Pause pauses processing of tasks from the given queue. // Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error { func (r *RDB) Pause(qname string) error {
key := base.PausedKey(qname) key := base.PausedKey(qname)
ok, err := r.client.SetNX(context.Background(), key, time.Now().Unix(), 0).Result() ok, err := r.client.SetNX(context.Background(), key, r.clock.Now().Unix(), 0).Result()
if err != nil { if err != nil {
return err return err
} }

View File

@ -2665,8 +2665,11 @@ func TestArchiveAllPendingTasks(t *testing.T) {
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute) now := time.Now()
t2 := time.Now().Add(1 * time.Hour) t1 := now.Add(1 * time.Minute)
t2 := now.Add(1 * time.Hour)
r.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -2690,8 +2693,8 @@ func TestArchiveAllPendingTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: time.Now().Unix()}, {Message: m2, Score: now.Unix()},
}, },
}, },
}, },
@ -2709,7 +2712,7 @@ func TestArchiveAllPendingTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: t2.Unix()}, {Message: m2, Score: t2.Unix()},
}, },
}, },
@ -2754,8 +2757,8 @@ func TestArchiveAllPendingTasks(t *testing.T) {
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
{Message: m3, Score: time.Now().Unix()}, {Message: m3, Score: now.Unix()},
{Message: m4, Score: time.Now().Unix()}, {Message: m4, Score: now.Unix()},
}, },
}, },
}, },
@ -2783,7 +2786,7 @@ func TestArchiveAllPendingTasks(t *testing.T) {
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, qname) gotArchived := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff) base.ArchivedKey(qname), diff)
} }
@ -2797,10 +2800,13 @@ func TestArchiveAllRetryTasks(t *testing.T) {
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute) now := time.Now()
t2 := time.Now().Add(1 * time.Hour) t1 := now.Add(1 * time.Minute)
t3 := time.Now().Add(2 * time.Hour) t2 := now.Add(1 * time.Hour)
t4 := time.Now().Add(3 * time.Hour) t3 := now.Add(2 * time.Hour)
t4 := now.Add(3 * time.Hour)
r.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -2827,8 +2833,8 @@ func TestArchiveAllRetryTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: time.Now().Unix()}, {Message: m2, Score: now.Unix()},
}, },
}, },
}, },
@ -2846,7 +2852,7 @@ func TestArchiveAllRetryTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: t2.Unix()}, {Message: m2, Score: t2.Unix()},
}, },
}, },
@ -2900,8 +2906,8 @@ func TestArchiveAllRetryTasks(t *testing.T) {
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
{Message: m3, Score: time.Now().Unix()}, {Message: m3, Score: now.Unix()},
{Message: m4, Score: time.Now().Unix()}, {Message: m4, Score: now.Unix()},
}, },
}, },
}, },
@ -2921,7 +2927,7 @@ func TestArchiveAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantRetry { for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, qname) gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryKey(qname), diff) base.RetryKey(qname), diff)
} }
@ -2929,7 +2935,7 @@ func TestArchiveAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, qname) gotArchived := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff) base.ArchivedKey(qname), diff)
} }
@ -2944,10 +2950,13 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(time.Minute) now := time.Now()
t2 := time.Now().Add(time.Hour) t1 := now.Add(time.Minute)
t3 := time.Now().Add(time.Hour) t2 := now.Add(time.Hour)
t4 := time.Now().Add(time.Hour) t3 := now.Add(time.Hour)
t4 := now.Add(time.Hour)
r.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -2974,8 +2983,8 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: time.Now().Unix()}, {Message: m2, Score: now.Unix()},
}, },
}, },
}, },
@ -2993,7 +3002,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": { "default": {
{Message: m1, Score: time.Now().Unix()}, {Message: m1, Score: now.Unix()},
{Message: m2, Score: t2.Unix()}, {Message: m2, Score: t2.Unix()},
}, },
}, },
@ -3047,8 +3056,8 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
{Message: m3, Score: time.Now().Unix()}, {Message: m3, Score: now.Unix()},
{Message: m4, Score: time.Now().Unix()}, {Message: m4, Score: now.Unix()},
}, },
}, },
}, },
@ -3068,7 +3077,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantScheduled { for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname) gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledKey(qname), diff) base.ScheduledKey(qname), diff)
} }
@ -3076,7 +3085,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantArchived { for qname, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, qname) gotArchived := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff) base.ArchivedKey(qname), diff)
} }

View File

@ -313,6 +313,7 @@ func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "send_email", Type: "send_email",
@ -432,7 +433,7 @@ func TestDequeue(t *testing.T) {
tc.args, gotMsg, tc.wantMsg) tc.args, gotMsg, tc.wantMsg)
continue continue
} }
if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { if !cmp.Equal(gotDeadline, tc.wantDeadline) {
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v",
tc.args, gotDeadline, tc.wantDeadline) tc.args, gotDeadline, tc.wantDeadline)
continue continue
@ -452,8 +453,7 @@ func TestDequeue(t *testing.T) {
} }
for queue, want := range tc.wantDeadlines { for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
cmpOpts := []cmp.Option{h.SortZSetEntryOpt, h.EquateInt64Approx(2)} // allow up to 2 second margin in Score if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
if diff := cmp.Diff(want, gotDeadlines, cmpOpts...); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff)
} }
} }
@ -1451,6 +1451,7 @@ func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "send_email", Type: "send_email",
@ -1494,7 +1495,7 @@ func TestRetry(t *testing.T) {
errMsg string errMsg string
wantActive map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z wantDeadlines map[string][]base.Z
getWantRetry func(failedAt time.Time) map[string][]base.Z wantRetry map[string][]base.Z
}{ }{
{ {
active: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
@ -1515,13 +1516,11 @@ func TestRetry(t *testing.T) {
wantDeadlines: map[string][]base.Z{ wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}}, "default": {{Message: t2, Score: t2Deadline}},
}, },
getWantRetry: func(failedAt time.Time) map[string][]base.Z { wantRetry: map[string][]base.Z{
return map[string][]base.Z{
"default": { "default": {
{Message: h.TaskMessageAfterRetry(*t1, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, {Message: h.TaskMessageAfterRetry(*t1, errMsg, now), Score: now.Add(5 * time.Minute).Unix()},
{Message: t3, Score: now.Add(time.Minute).Unix()}, {Message: t3, Score: now.Add(time.Minute).Unix()},
}, },
}
}, },
}, },
{ {
@ -1548,13 +1547,11 @@ func TestRetry(t *testing.T) {
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
"custom": {}, "custom": {},
}, },
getWantRetry: func(failedAt time.Time) map[string][]base.Z { wantRetry: map[string][]base.Z{
return map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
{Message: h.TaskMessageAfterRetry(*t4, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, {Message: h.TaskMessageAfterRetry(*t4, errMsg, now), Score: now.Add(5 * time.Minute).Unix()},
}, },
}
}, },
}, },
} }
@ -1565,7 +1562,6 @@ func TestRetry(t *testing.T) {
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
callTime := time.Now() // time when method was called
err := r.Retry(tc.msg, tc.processAt, tc.errMsg, true /*isFailure*/) err := r.Retry(tc.msg, tc.processAt, tc.errMsg, true /*isFailure*/)
if err != nil { if err != nil {
t.Errorf("(*RDB).Retry = %v, want nil", err) t.Errorf("(*RDB).Retry = %v, want nil", err)
@ -1584,14 +1580,9 @@ func TestRetry(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
} }
} }
cmpOpts := []cmp.Option{ for queue, want := range tc.wantRetry {
h.SortZSetEntryOpt,
cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field
}
wantRetry := tc.getWantRetry(callTime)
for queue, want := range wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, queue) gotRetry := h.GetRetryEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff)
} }
} }
@ -1634,6 +1625,7 @@ func TestRetryWithNonFailureError(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "send_email", Type: "send_email",
@ -1677,7 +1669,7 @@ func TestRetryWithNonFailureError(t *testing.T) {
errMsg string errMsg string
wantActive map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z wantDeadlines map[string][]base.Z
getWantRetry func(failedAt time.Time) map[string][]base.Z wantRetry map[string][]base.Z
}{ }{
{ {
active: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
@ -1698,14 +1690,12 @@ func TestRetryWithNonFailureError(t *testing.T) {
wantDeadlines: map[string][]base.Z{ wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}}, "default": {{Message: t2, Score: t2Deadline}},
}, },
getWantRetry: func(failedAt time.Time) map[string][]base.Z { wantRetry: map[string][]base.Z{
return map[string][]base.Z{
"default": { "default": {
// Task message should include the error message but without incrementing the retry count. // Task message should include the error message but without incrementing the retry count.
{Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Add(5 * time.Minute).Unix()},
{Message: t3, Score: now.Add(time.Minute).Unix()}, {Message: t3, Score: now.Add(time.Minute).Unix()},
}, },
}
}, },
}, },
{ {
@ -1732,14 +1722,12 @@ func TestRetryWithNonFailureError(t *testing.T) {
"default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}}, "default": {{Message: t1, Score: t1Deadline}, {Message: t2, Score: t2Deadline}},
"custom": {}, "custom": {},
}, },
getWantRetry: func(failedAt time.Time) map[string][]base.Z { wantRetry: map[string][]base.Z{
return map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
// Task message should include the error message but without incrementing the retry count. // Task message should include the error message but without incrementing the retry count.
{Message: h.TaskMessageWithError(*t4, errMsg, failedAt), Score: now.Add(5 * time.Minute).Unix()}, {Message: h.TaskMessageWithError(*t4, errMsg, now), Score: now.Add(5 * time.Minute).Unix()},
}, },
}
}, },
}, },
} }
@ -1750,7 +1738,6 @@ func TestRetryWithNonFailureError(t *testing.T) {
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
callTime := time.Now() // time when method was called
err := r.Retry(tc.msg, tc.processAt, tc.errMsg, false /*isFailure*/) err := r.Retry(tc.msg, tc.processAt, tc.errMsg, false /*isFailure*/)
if err != nil { if err != nil {
t.Errorf("(*RDB).Retry = %v, want nil", err) t.Errorf("(*RDB).Retry = %v, want nil", err)
@ -1769,14 +1756,9 @@ func TestRetryWithNonFailureError(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
} }
} }
cmpOpts := []cmp.Option{ for queue, want := range tc.wantRetry {
h.SortZSetEntryOpt,
cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field
}
wantRetry := tc.getWantRetry(callTime)
for queue, want := range wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, queue) gotRetry := h.GetRetryEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff)
} }
} }
@ -1813,6 +1795,7 @@ func TestArchive(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "send_email", Type: "send_email",
@ -1863,7 +1846,7 @@ func TestArchive(t *testing.T) {
target *base.TaskMessage // task to archive target *base.TaskMessage // task to archive
wantActive map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z wantDeadlines map[string][]base.Z
getWantArchived func(failedAt time.Time) map[string][]base.Z wantArchived map[string][]base.Z
}{ }{
{ {
active: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
@ -1887,13 +1870,11 @@ func TestArchive(t *testing.T) {
wantDeadlines: map[string][]base.Z{ wantDeadlines: map[string][]base.Z{
"default": {{Message: t2, Score: t2Deadline}}, "default": {{Message: t2, Score: t2Deadline}},
}, },
getWantArchived: func(failedAt time.Time) map[string][]base.Z { wantArchived: map[string][]base.Z{
return map[string][]base.Z{
"default": { "default": {
{Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
{Message: t3, Score: now.Add(-time.Hour).Unix()}, {Message: t3, Score: now.Add(-time.Hour).Unix()},
}, },
}
}, },
}, },
{ {
@ -1920,12 +1901,10 @@ func TestArchive(t *testing.T) {
{Message: t3, Score: t3Deadline}, {Message: t3, Score: t3Deadline},
}, },
}, },
getWantArchived: func(failedAt time.Time) map[string][]base.Z { wantArchived: map[string][]base.Z{
return map[string][]base.Z{
"default": { "default": {
{Message: h.TaskMessageWithError(*t1, errMsg, failedAt), Score: failedAt.Unix()}, {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
}, },
}
}, },
}, },
{ {
@ -1954,13 +1933,11 @@ func TestArchive(t *testing.T) {
"default": {{Message: t1, Score: t1Deadline}}, "default": {{Message: t1, Score: t1Deadline}},
"custom": {}, "custom": {},
}, },
getWantArchived: func(failedAt time.Time) map[string][]base.Z { wantArchived: map[string][]base.Z{
return map[string][]base.Z{
"default": {}, "default": {},
"custom": { "custom": {
{Message: h.TaskMessageWithError(*t4, errMsg, failedAt), Score: failedAt.Unix()}, {Message: h.TaskMessageWithError(*t4, errMsg, now), Score: now.Unix()},
}, },
}
}, },
}, },
} }
@ -1971,7 +1948,6 @@ func TestArchive(t *testing.T) {
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllArchivedQueues(t, r.client, tc.archived) h.SeedAllArchivedQueues(t, r.client, tc.archived)
callTime := time.Now() // record time `Archive` was called
err := r.Archive(tc.target, errMsg) err := r.Archive(tc.target, errMsg)
if err != nil { if err != nil {
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err) t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err)
@ -1990,7 +1966,7 @@ func TestArchive(t *testing.T) {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff)
} }
} }
for queue, want := range tc.getWantArchived(callTime) { for queue, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, queue) gotArchived := h.GetArchivedEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)