diff --git a/asynq.go b/asynq.go index 696cc4e..aa5ec84 100644 --- a/asynq.go +++ b/asynq.go @@ -5,7 +5,7 @@ import "github.com/go-redis/redis/v7" /* TODOs: - [P0] asynqmon kill , asynqmon killall -- [P0] Test refactor - helpers to initialize queues and read queue contents +- [P0] Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Redis Memory Usage, Connection info in stats - [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go new file mode 100644 index 0000000..5f39bc7 --- /dev/null +++ b/internal/rdb/helpers_test.go @@ -0,0 +1,147 @@ +package rdb + +import ( + "encoding/json" + "math/rand" + "sort" + "testing" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/go-cmp/cmp" + "github.com/rs/xid" +) + +// This file defines test helpers for the rdb package testing. + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// TODO(hibiken): Get Redis address and db number from ENV variables. +func setup(t *testing.T) *RDB { + t.Helper() + r := NewRDB(redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 13, + })) + // Start each test with a clean slate. + flushDB(t, r) + return r +} + +func flushDB(t *testing.T, r *RDB) { + t.Helper() + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } +} + +var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage { + out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out +}) + +func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { + return &TaskMessage{ + ID: xid.New(), + Type: taskType, + Queue: "default", + Retry: 25, + Payload: payload, + } +} + +func mustMarshal(t *testing.T, msg *TaskMessage) string { + t.Helper() + data, err := json.Marshal(msg) + if err != nil { + t.Fatal(err) + } + return string(data) +} + +func mustUnmarshal(t *testing.T, data string) *TaskMessage { + t.Helper() + var msg TaskMessage + err := json.Unmarshal([]byte(data), &msg) + if err != nil { + t.Fatal(err) + } + return &msg +} + +func mustMarshalSlice(t *testing.T, msgs []*TaskMessage) []string { + t.Helper() + var data []string + for _, m := range msgs { + data = append(data, mustMarshal(t, m)) + } + return data +} + +func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage { + t.Helper() + var msgs []*TaskMessage + for _, s := range data { + msgs = append(msgs, mustUnmarshal(t, s)) + } + return msgs +} + +func seedRedisList(t *testing.T, c *redis.Client, key string, msgs []*TaskMessage) { + data := mustMarshalSlice(t, msgs) + for _, s := range data { + if err := c.LPush(key, s).Err(); err != nil { + t.Fatal(err) + } + } +} + +func seedRedisZSet(t *testing.T, c *redis.Client, key string, items []sortedSetEntry) { + for _, item := range items { + z := &redis.Z{Member: mustMarshal(t, item.msg), Score: float64(item.score)} + if err := c.ZAdd(key, z).Err(); err != nil { + t.Fatal(err) + } + } +} + +// scheduledEntry represents an item in redis sorted set (aka ZSET). +type sortedSetEntry struct { + msg *TaskMessage + score int64 +} + +// seedDefaultQueue initializes the default queue with the given messages. +func seedDefaultQueue(t *testing.T, r *RDB, msgs []*TaskMessage) { + t.Helper() + seedRedisList(t, r.client, defaultQ, msgs) +} + +// seedInProgressQueue initializes the in-progress queue with the given messages. +func seedInProgressQueue(t *testing.T, r *RDB, msgs []*TaskMessage) { + t.Helper() + seedRedisList(t, r.client, inProgressQ, msgs) +} + +// seedScheduledQueue initializes the scheduled queue with the given messages. +func seedScheduledQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { + t.Helper() + seedRedisZSet(t, r.client, scheduledQ, entries) +} + +// seedRetryQueue initializes the retry queue with the given messages. +func seedRetryQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { + t.Helper() + seedRedisZSet(t, r.client, retryQ, entries) +} + +// seedDeadQueue initializes the dead queue with the given messages. +func seedDeadQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { + t.Helper() + seedRedisZSet(t, r.client, deadQ, entries) +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 1cdef27..8725374 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/rs/xid" ) @@ -58,17 +57,19 @@ func TestCurrentStats(t *testing.T) { tests := []struct { enqueued []*TaskMessage inProgress []*TaskMessage - scheduled []*TaskMessage - retry []*TaskMessage - dead []*TaskMessage + scheduled []sortedSetEntry + retry []sortedSetEntry + dead []sortedSetEntry want *Stats }{ { enqueued: []*TaskMessage{m1}, inProgress: []*TaskMessage{m2}, - scheduled: []*TaskMessage{m3, m4}, - retry: []*TaskMessage{}, - dead: []*TaskMessage{}, + scheduled: []sortedSetEntry{ + {m3, time.Now().Add(time.Hour).Unix()}, + {m4, time.Now().Unix()}}, + retry: []sortedSetEntry{}, + dead: []sortedSetEntry{}, want: &Stats{ Enqueued: 1, InProgress: 1, @@ -81,9 +82,13 @@ func TestCurrentStats(t *testing.T) { { enqueued: []*TaskMessage{}, inProgress: []*TaskMessage{}, - scheduled: []*TaskMessage{m3, m4}, - retry: []*TaskMessage{m1}, - dead: []*TaskMessage{m2}, + scheduled: []sortedSetEntry{ + {m3, time.Now().Unix()}, + {m4, time.Now().Unix()}}, + retry: []sortedSetEntry{ + {m1, time.Now().Add(time.Minute).Unix()}}, + dead: []sortedSetEntry{ + {m2, time.Now().Add(-time.Hour).Unix()}}, want: &Stats{ Enqueued: 0, InProgress: 0, @@ -96,36 +101,12 @@ func TestCurrentStats(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the queues. - for _, msg := range tc.enqueued { - if err := r.Enqueue(msg); err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.inProgress { - if err := r.client.LPush(inProgressQ, mustMarshal(t, msg)).Err(); err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.scheduled { - if err := r.Schedule(msg, time.Now().Add(time.Hour)); err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.retry { - if err := r.RetryLater(msg, time.Now().Add(time.Hour)); err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.dead { - if err := r.Kill(msg); err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDefaultQueue(t, r, tc.enqueued) + seedInProgressQueue(t, r, tc.inProgress) + seedScheduledQueue(t, r, tc.scheduled) + seedRetryQueue(t, r, tc.retry) + seedDeadQueue(t, r, tc.dead) got, err := r.CurrentStats() if err != nil { @@ -163,16 +144,9 @@ func TestListEnqueued(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the list - for _, msg := range tc.enqueued { - if err := r.Enqueue(msg); err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDefaultQueue(t, r, tc.enqueued) + got, err := r.ListEnqueued() if err != nil { t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want) @@ -200,30 +174,23 @@ func TestListInProgress(t *testing.T) { t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} tests := []struct { - enqueued []*TaskMessage - want []*InProgressTask + inProgress []*TaskMessage + want []*InProgressTask }{ { - enqueued: []*TaskMessage{m1, m2}, - want: []*InProgressTask{t1, t2}, + inProgress: []*TaskMessage{m1, m2}, + want: []*InProgressTask{t1, t2}, }, { - enqueued: []*TaskMessage{}, - want: []*InProgressTask{}, + inProgress: []*TaskMessage{}, + want: []*InProgressTask{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the list - for _, msg := range tc.enqueued { - if err := r.client.LPush(inProgressQ, mustMarshal(t, msg)).Err(); err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedInProgressQueue(t, r, tc.inProgress) + got, err := r.ListInProgress() if err != nil { t.Errorf("r.ListInProgress() = %v, %v, want %v, nil", got, err, tc.want) @@ -252,40 +219,26 @@ func TestListScheduled(t *testing.T) { t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()} t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()} - type scheduledEntry struct { - msg *TaskMessage - processAt time.Time - } - tests := []struct { - scheduled []scheduledEntry + scheduled []sortedSetEntry want []*ScheduledTask }{ { - scheduled: []scheduledEntry{ - {m1, p1}, - {m2, p2}, + scheduled: []sortedSetEntry{ + {m1, p1.Unix()}, + {m2, p2.Unix()}, }, want: []*ScheduledTask{t1, t2}, }, { - scheduled: []scheduledEntry{}, + scheduled: []sortedSetEntry{}, want: []*ScheduledTask{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the scheduled queue - for _, s := range tc.scheduled { - err := r.Schedule(s.msg, s.processAt) - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) got, err := r.ListScheduled() if err != nil { @@ -329,47 +282,47 @@ func TestListRetry(t *testing.T) { } p1 := time.Now().Add(5 * time.Minute) p2 := time.Now().Add(24 * time.Hour) - t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, - ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, - Retry: m1.Retry, Score: p1.Unix()} - t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, - ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, - Retry: m2.Retry, Score: p2.Unix()} - - type retryEntry struct { - msg *TaskMessage - processAt time.Time + t1 := &RetryTask{ + ID: m1.ID, + Type: m1.Type, + Payload: m1.Payload, + ProcessAt: p1, + ErrorMsg: m1.ErrorMsg, + Retried: m1.Retried, + Retry: m1.Retry, + Score: p1.Unix(), + } + t2 := &RetryTask{ + ID: m2.ID, + Type: m2.Type, + Payload: m2.Payload, + ProcessAt: p2, + ErrorMsg: m2.ErrorMsg, + Retried: m2.Retried, + Retry: m2.Retry, + Score: p2.Unix(), } tests := []struct { - dead []retryEntry - want []*RetryTask + retry []sortedSetEntry + want []*RetryTask }{ { - dead: []retryEntry{ - {m1, p1}, - {m2, p2}, + retry: []sortedSetEntry{ + {m1, p1.Unix()}, + {m2, p2.Unix()}, }, want: []*RetryTask{t1, t2}, }, { - dead: []retryEntry{}, - want: []*RetryTask{}, + retry: []sortedSetEntry{}, + want: []*RetryTask{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the scheduled queue - for _, d := range tc.dead { - r.client.ZAdd(retryQ, &redis.Z{ - Member: mustMarshal(t, d.msg), - Score: float64(d.processAt.Unix()), - }) - } + flushDB(t, r) // clean up db before each test case + seedRetryQueue(t, r, tc.retry) got, err := r.ListRetry() if err != nil { @@ -409,45 +362,43 @@ func TestListDead(t *testing.T) { } f1 := time.Now().Add(-5 * time.Minute) f2 := time.Now().Add(-24 * time.Hour) - t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, - LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix()} - t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, - LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix()} - - type deadEntry struct { - msg *TaskMessage - lastFailedAt time.Time + t1 := &DeadTask{ + ID: m1.ID, + Type: m1.Type, + Payload: m1.Payload, + LastFailedAt: f1, + ErrorMsg: m1.ErrorMsg, + Score: f1.Unix(), + } + t2 := &DeadTask{ + ID: m2.ID, + Type: m2.Type, + Payload: m2.Payload, + LastFailedAt: f2, + ErrorMsg: m2.ErrorMsg, + Score: f2.Unix(), } tests := []struct { - dead []deadEntry + dead []sortedSetEntry want []*DeadTask }{ { - dead: []deadEntry{ - {m1, f1}, - {m2, f2}, + dead: []sortedSetEntry{ + {m1, f1.Unix()}, + {m2, f2.Unix()}, }, want: []*DeadTask{t1, t2}, }, { - dead: []deadEntry{}, + dead: []sortedSetEntry{}, want: []*DeadTask{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize the scheduled queue - for _, d := range tc.dead { - r.client.ZAdd(deadQ, &redis.Z{ - Member: mustMarshal(t, d.msg), - Score: float64(d.lastFailedAt.Unix()), - }) - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) got, err := r.ListDead() if err != nil { @@ -473,17 +424,13 @@ var timeCmpOpt = EquateApproxTime(time.Second) func TestEnqueueDeadTask(t *testing.T) { r := setup(t) - t1 := newTaskMessage("send_email", nil) t2 := newTaskMessage("gen_thumbnail", nil) s1 := time.Now().Add(-5 * time.Minute).Unix() s2 := time.Now().Add(-time.Hour).Unix() - type deadEntry struct { - msg *TaskMessage - score int64 - } + tests := []struct { - dead []deadEntry + dead []sortedSetEntry score int64 id xid.ID want error // expected return value from calling EnqueueDeadTask @@ -491,7 +438,7 @@ func TestEnqueueDeadTask(t *testing.T) { wantEnqueued []*TaskMessage }{ { - dead: []deadEntry{ + dead: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -502,7 +449,7 @@ func TestEnqueueDeadTask(t *testing.T) { wantEnqueued: []*TaskMessage{t2}, }, { - dead: []deadEntry{ + dead: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -515,30 +462,19 @@ func TestEnqueueDeadTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize dead queue - for _, d := range tc.dead { - err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) got := r.EnqueueDeadTask(tc.id, tc.score) if got != tc.want { t.Errorf("r.EnqueueDeadTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) } - gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { @@ -554,12 +490,8 @@ func TestEnqueueRetryTask(t *testing.T) { t2 := newTaskMessage("gen_thumbnail", nil) s1 := time.Now().Add(-5 * time.Minute).Unix() s2 := time.Now().Add(-time.Hour).Unix() - type retryEntry struct { - msg *TaskMessage - score int64 - } tests := []struct { - dead []retryEntry + retry []sortedSetEntry score int64 id xid.ID want error // expected return value from calling EnqueueRetryTask @@ -567,7 +499,7 @@ func TestEnqueueRetryTask(t *testing.T) { wantEnqueued []*TaskMessage }{ { - dead: []retryEntry{ + retry: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -578,7 +510,7 @@ func TestEnqueueRetryTask(t *testing.T) { wantEnqueued: []*TaskMessage{t2}, }, { - dead: []retryEntry{ + retry: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -591,30 +523,20 @@ func TestEnqueueRetryTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize retry queue - for _, d := range tc.dead { - err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + + seedRetryQueue(t, r, tc.retry) // initialize retry queue got := r.EnqueueRetryTask(tc.id, tc.score) if got != tc.want { t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) } - gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { @@ -625,17 +547,13 @@ func TestEnqueueRetryTask(t *testing.T) { func TestEnqueueScheduledTask(t *testing.T) { r := setup(t) - t1 := newTaskMessage("send_email", nil) t2 := newTaskMessage("gen_thumbnail", nil) s1 := time.Now().Add(-5 * time.Minute).Unix() s2 := time.Now().Add(-time.Hour).Unix() - type scheduledEntry struct { - msg *TaskMessage - score int64 - } + tests := []struct { - dead []scheduledEntry + scheduled []sortedSetEntry score int64 id xid.ID want error // expected return value from calling EnqueueScheduledTask @@ -643,7 +561,7 @@ func TestEnqueueScheduledTask(t *testing.T) { wantEnqueued []*TaskMessage }{ { - dead: []scheduledEntry{ + scheduled: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -654,7 +572,7 @@ func TestEnqueueScheduledTask(t *testing.T) { wantEnqueued: []*TaskMessage{t2}, }, { - dead: []scheduledEntry{ + scheduled: []sortedSetEntry{ {t1, s1}, {t2, s2}, }, @@ -667,30 +585,19 @@ func TestEnqueueScheduledTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize scheduled queue - for _, d := range tc.dead { - err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) got := r.EnqueueScheduledTask(tc.id, tc.score) if got != tc.want { t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) } - gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { @@ -706,56 +613,49 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { t3 := newTaskMessage("reindex", nil) tests := []struct { - description string - scheduled []*TaskMessage + desc string + scheduled []sortedSetEntry want int64 wantEnqueued []*TaskMessage }{ { - description: "with tasks in scheduled queue", - scheduled: []*TaskMessage{t1, t2, t3}, + desc: "with tasks in scheduled queue", + scheduled: []sortedSetEntry{ + {t1, time.Now().Add(time.Hour).Unix()}, + {t2, time.Now().Add(time.Hour).Unix()}, + {t3, time.Now().Add(time.Hour).Unix()}, + }, want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { - description: "with empty scheduled queue", - scheduled: []*TaskMessage{}, + desc: "with empty scheduled queue", + scheduled: []sortedSetEntry{}, want: 0, wantEnqueued: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize scheduled queue - for _, msg := range tc.scheduled { - err := r.client.ZAdd(scheduledQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Add(time.Hour).Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) got, err := r.EnqueueAllScheduledTasks() if err != nil { t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", - tc.description, got, err, tc.want) + tc.desc, got, err, tc.want) continue } if got != tc.want { t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", - tc.description, got, err, tc.want) + tc.desc, got, err, tc.want) } gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, defaultQ, diff) } } } @@ -768,38 +668,31 @@ func TestEnqueueAllRetryTasks(t *testing.T) { tests := []struct { description string - retry []*TaskMessage + retry []sortedSetEntry want int64 wantEnqueued []*TaskMessage }{ { - description: "with tasks in retry queue", - retry: []*TaskMessage{t1, t2, t3}, + description: "with tasks in retry queue", + retry: []sortedSetEntry{ + {t1, time.Now().Add(time.Hour).Unix()}, + {t2, time.Now().Add(time.Hour).Unix()}, + {t3, time.Now().Add(time.Hour).Unix()}, + }, want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { description: "with empty retry queue", - retry: []*TaskMessage{}, + retry: []sortedSetEntry{}, want: 0, wantEnqueued: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize retry queue - for _, msg := range tc.retry { - err := r.client.ZAdd(retryQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Add(time.Hour).Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedRetryQueue(t, r, tc.retry) got, err := r.EnqueueAllRetryTasks() if err != nil { @@ -828,56 +721,49 @@ func TestEnqueueAllDeadTasks(t *testing.T) { t3 := newTaskMessage("reindex", nil) tests := []struct { - description string - dead []*TaskMessage + desc string + dead []sortedSetEntry want int64 wantEnqueued []*TaskMessage }{ { - description: "with tasks in dead queue", - dead: []*TaskMessage{t1, t2, t3}, + desc: "with tasks in dead queue", + dead: []sortedSetEntry{ + {t1, time.Now().Add(-time.Minute).Unix()}, + {t2, time.Now().Add(-time.Minute).Unix()}, + {t3, time.Now().Add(-time.Minute).Unix()}, + }, want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { - description: "with empty dead queue", - dead: []*TaskMessage{}, + desc: "with empty dead queue", + dead: []sortedSetEntry{}, want: 0, wantEnqueued: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize dead queue - for _, msg := range tc.dead { - err := r.client.ZAdd(deadQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Add(time.Hour).Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) got, err := r.EnqueueAllDeadTasks() if err != nil { t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", - tc.description, got, err, tc.want) + tc.desc, got, err, tc.want) continue } if got != tc.want { t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", - tc.description, got, err, tc.want) + tc.desc, got, err, tc.want) } gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, defaultQ, diff) } } } @@ -889,19 +775,15 @@ func TestDeleteDeadTask(t *testing.T) { t1 := time.Now().Add(-5 * time.Minute) t2 := time.Now().Add(-time.Hour) - type deadEntry struct { - msg *TaskMessage - score int64 - } tests := []struct { - dead []deadEntry + dead []sortedSetEntry id xid.ID score int64 want error wantDead []*TaskMessage }{ { - dead: []deadEntry{ + dead: []sortedSetEntry{ {m1, t1.Unix()}, {m2, t2.Unix()}, }, @@ -911,7 +793,7 @@ func TestDeleteDeadTask(t *testing.T) { wantDead: []*TaskMessage{m2}, }, { - dead: []deadEntry{ + dead: []sortedSetEntry{ {m1, t1.Unix()}, {m2, t2.Unix()}, }, @@ -921,7 +803,7 @@ func TestDeleteDeadTask(t *testing.T) { wantDead: []*TaskMessage{m1, m2}, }, { - dead: []deadEntry{}, + dead: []sortedSetEntry{}, id: m1.ID, score: t1.Unix(), want: ErrTaskNotFound, @@ -930,20 +812,8 @@ func TestDeleteDeadTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize dead queue. - for _, d := range tc.dead { - err := r.client.ZAdd(deadQ, &redis.Z{ - Member: mustMarshal(t, d.msg), - Score: float64(d.score), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) got := r.DeleteDeadTask(tc.id, tc.score) if got != tc.want { @@ -966,19 +836,15 @@ func TestDeleteRetryTask(t *testing.T) { t1 := time.Now().Add(5 * time.Minute) t2 := time.Now().Add(time.Hour) - type retryEntry struct { - msg *TaskMessage - score int64 - } tests := []struct { - retry []retryEntry + retry []sortedSetEntry id xid.ID score int64 want error wantRetry []*TaskMessage }{ { - retry: []retryEntry{ + retry: []sortedSetEntry{ {m1, t1.Unix()}, {m2, t2.Unix()}, }, @@ -988,7 +854,7 @@ func TestDeleteRetryTask(t *testing.T) { wantRetry: []*TaskMessage{m2}, }, { - retry: []retryEntry{ + retry: []sortedSetEntry{ {m1, t1.Unix()}, }, id: m2.ID, @@ -999,20 +865,8 @@ func TestDeleteRetryTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize retry queue. - for _, e := range tc.retry { - err := r.client.ZAdd(retryQ, &redis.Z{ - Member: mustMarshal(t, e.msg), - Score: float64(e.score), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedRetryQueue(t, r, tc.retry) got := r.DeleteRetryTask(tc.id, tc.score) if got != tc.want { @@ -1035,19 +889,15 @@ func TestDeleteScheduledTask(t *testing.T) { t1 := time.Now().Add(5 * time.Minute) t2 := time.Now().Add(time.Hour) - type scheduledEntry struct { - msg *TaskMessage - score int64 - } tests := []struct { - scheduled []scheduledEntry + scheduled []sortedSetEntry id xid.ID score int64 want error wantScheduled []*TaskMessage }{ { - scheduled: []scheduledEntry{ + scheduled: []sortedSetEntry{ {m1, t1.Unix()}, {m2, t2.Unix()}, }, @@ -1057,7 +907,7 @@ func TestDeleteScheduledTask(t *testing.T) { wantScheduled: []*TaskMessage{m2}, }, { - scheduled: []scheduledEntry{ + scheduled: []sortedSetEntry{ {m1, t1.Unix()}, }, id: m2.ID, @@ -1068,20 +918,8 @@ func TestDeleteScheduledTask(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize scheduled queue. - for _, e := range tc.scheduled { - err := r.client.ZAdd(scheduledQ, &redis.Z{ - Member: mustMarshal(t, e.msg), - Score: float64(e.score), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) got := r.DeleteScheduledTask(tc.id, tc.score) if got != tc.want { @@ -1104,30 +942,22 @@ func TestDeleteAllDeadTasks(t *testing.T) { m3 := newTaskMessage("gen_thumbnail", nil) tests := []struct { - initDead []*TaskMessage + dead []sortedSetEntry wantDead []*TaskMessage }{ { - initDead: []*TaskMessage{m1, m2, m3}, + dead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, time.Now().Unix()}, + {m3, time.Now().Unix()}, + }, wantDead: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize dead queue. - for _, msg := range tc.initDead { - err := r.client.ZAdd(deadQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) err := r.DeleteAllDeadTasks() if err != nil { @@ -1149,30 +979,22 @@ func TestDeleteAllRetryTasks(t *testing.T) { m3 := newTaskMessage("gen_thumbnail", nil) tests := []struct { - initRetry []*TaskMessage + retry []sortedSetEntry wantRetry []*TaskMessage }{ { - initRetry: []*TaskMessage{m1, m2, m3}, + retry: []sortedSetEntry{ + {m1, time.Now().Unix()}, + {m2, time.Now().Unix()}, + {m3, time.Now().Unix()}, + }, wantRetry: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize retry queue. - for _, msg := range tc.initRetry { - err := r.client.ZAdd(retryQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedRetryQueue(t, r, tc.retry) err := r.DeleteAllRetryTasks() if err != nil { @@ -1194,30 +1016,22 @@ func TestDeleteAllScheduledTasks(t *testing.T) { m3 := newTaskMessage("gen_thumbnail", nil) tests := []struct { - initScheduled []*TaskMessage + scheduled []sortedSetEntry wantScheduled []*TaskMessage }{ { - initScheduled: []*TaskMessage{m1, m2, m3}, + scheduled: []sortedSetEntry{ + {m1, time.Now().Add(time.Minute).Unix()}, + {m2, time.Now().Add(time.Minute).Unix()}, + {m3, time.Now().Add(time.Minute).Unix()}, + }, wantScheduled: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize scheduled queue. - for _, msg := range tc.initScheduled { - err := r.client.ZAdd(scheduledQ, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) err := r.DeleteAllScheduledTasks() if err != nil { diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f55ab72..88791c2 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1,91 +1,13 @@ package rdb import ( - "encoding/json" "fmt" - "math/rand" - "sort" "testing" "time" - "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" - "github.com/rs/xid" ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - -// TODO(hibiken): Get Redis address and db number from ENV variables. -func setup(t *testing.T) *RDB { - t.Helper() - r := NewRDB(redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - DB: 13, - })) - // Start each test with a clean slate. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - return r -} - -var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage { - out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out -}) - -func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { - return &TaskMessage{ - ID: xid.New(), - Type: taskType, - Queue: "default", - Retry: 25, - Payload: payload, - } -} - -func mustMarshal(t *testing.T, task *TaskMessage) string { - t.Helper() - data, err := json.Marshal(task) - if err != nil { - t.Fatal(err) - } - return string(data) -} - -func mustUnmarshal(t *testing.T, data string) *TaskMessage { - t.Helper() - var task TaskMessage - err := json.Unmarshal([]byte(data), &task) - if err != nil { - t.Fatal(err) - } - return &task -} - -func mustMarshalSlice(t *testing.T, tasks []*TaskMessage) []string { - t.Helper() - var data []string - for _, task := range tasks { - data = append(data, mustMarshal(t, task)) - } - return data -} - -func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage { - t.Helper() - var tasks []*TaskMessage - for _, s := range data { - tasks = append(tasks, mustUnmarshal(t, s)) - } - return tasks -} - func TestEnqueue(t *testing.T) { r := setup(t) tests := []struct { @@ -97,21 +19,19 @@ func TestEnqueue(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } + flushDB(t, r) // clean up db before each test case. + err := r.Enqueue(tc.msg) if err != nil { - t.Error(err) + t.Errorf("(*RDB).Enqueue = %v, want nil", err) continue } res := r.client.LRange(defaultQ, 0, -1).Val() if len(res) != 1 { - t.Errorf("LIST %q has length %d, want 1", defaultQ, len(res)) + t.Errorf("%q has length %d, want 1", defaultQ, len(res)) continue } - if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" { + if diff := cmp.Diff(tc.msg, mustUnmarshal(t, res[0])); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) } } @@ -121,23 +41,19 @@ func TestDequeue(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) tests := []struct { - queued []*TaskMessage + enqueued []*TaskMessage want *TaskMessage err error inProgress int64 // length of "in-progress" tasks after dequeue }{ - {queued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1}, - {queued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0}, + {enqueued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1}, + {enqueued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0}, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, m := range tc.queued { - r.Enqueue(m) - } + flushDB(t, r) // clean up db before each test case + seedDefaultQueue(t, r, tc.enqueued) + got, err := r.Dequeue(time.Second) if !cmp.Equal(got, tc.want) || err != tc.err { t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", @@ -145,7 +61,7 @@ func TestDequeue(t *testing.T) { continue } if l := r.client.LLen(inProgressQ).Val(); l != tc.inProgress { - t.Errorf("LIST %q has length %d, want %d", inProgressQ, l, tc.inProgress) + t.Errorf("%q has length %d, want %d", inProgressQ, l, tc.inProgress) } } } @@ -156,54 +72,41 @@ func TestDone(t *testing.T) { t2 := newTaskMessage("export_csv", nil) tests := []struct { - initial []*TaskMessage // initial state of the in-progress list - target *TaskMessage // task to remove - final []*TaskMessage // final state of the in-progress list + inProgress []*TaskMessage // initial state of the in-progress list + target *TaskMessage // task to remove + wantInProgress []*TaskMessage // final state of the in-progress list }{ { - initial: []*TaskMessage{t1, t2}, - target: t1, - final: []*TaskMessage{t2}, + inProgress: []*TaskMessage{t1, t2}, + target: t1, + wantInProgress: []*TaskMessage{t2}, }, { - initial: []*TaskMessage{t2}, - target: t1, - final: []*TaskMessage{t2}, + inProgress: []*TaskMessage{t2}, + target: t1, + wantInProgress: []*TaskMessage{t2}, }, { - initial: []*TaskMessage{t1}, - target: t1, - final: []*TaskMessage{}, + inProgress: []*TaskMessage{t1}, + target: t1, + wantInProgress: []*TaskMessage{}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // set up initial state - for _, task := range tc.initial { - err := r.client.LPush(inProgressQ, mustMarshal(t, task)).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedInProgressQueue(t, r, tc.inProgress) err := r.Done(tc.target) if err != nil { - t.Error(err) + t.Errorf("(*RDB).Done(task) = %v, want nil", err) continue } - var got []*TaskMessage data := r.client.LRange(inProgressQ, 0, -1).Val() - for _, s := range data { - got = append(got, mustUnmarshal(t, s)) - } - - if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQ, diff) + gotInProgress := mustUnmarshalSlice(t, data) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", inProgressQ, diff) continue } } @@ -215,29 +118,20 @@ func TestKill(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - initial []*TaskMessage // inital state of "dead" set - target *TaskMessage // task to kill - want []*TaskMessage // final state of "dead" set + dead []sortedSetEntry // inital state of dead queue + target *TaskMessage // task to kill + wantDead []*TaskMessage // final state of dead queue }{ { - initial: []*TaskMessage{}, - target: t1, - want: []*TaskMessage{t1}, + dead: []sortedSetEntry{}, + target: t1, + wantDead: []*TaskMessage{t1}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // set up initial state - for _, task := range tc.initial { - err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } + flushDB(t, r) // clean up db before each test case + seedDeadQueue(t, r, tc.dead) err := r.Kill(tc.target) if err != nil { @@ -245,10 +139,10 @@ func TestKill(t *testing.T) { continue } - actual := r.client.ZRange(deadQ, 0, -1).Val() - got := mustUnmarshalSlice(t, actual) - if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", deadQ, diff) + data := r.client.ZRange(deadQ, 0, -1).Val() + gotDead := mustUnmarshalSlice(t, data) + if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff) continue } } @@ -261,59 +155,49 @@ func TestRestoreUnfinished(t *testing.T) { t3 := newTaskMessage("sync_stuff", nil) tests := []struct { - beforeSrc []*TaskMessage - beforeDst []*TaskMessage - afterSrc []*TaskMessage - afterDst []*TaskMessage + inProgress []*TaskMessage + enqueued []*TaskMessage + wantInProgress []*TaskMessage + wantEnqueued []*TaskMessage }{ { - beforeSrc: []*TaskMessage{t1, t2, t3}, - beforeDst: []*TaskMessage{}, - afterSrc: []*TaskMessage{}, - afterDst: []*TaskMessage{t1, t2, t3}, + inProgress: []*TaskMessage{t1, t2, t3}, + enqueued: []*TaskMessage{}, + wantInProgress: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { - beforeSrc: []*TaskMessage{}, - beforeDst: []*TaskMessage{t1, t2, t3}, - afterSrc: []*TaskMessage{}, - afterDst: []*TaskMessage{t1, t2, t3}, + inProgress: []*TaskMessage{}, + enqueued: []*TaskMessage{t1, t2, t3}, + wantInProgress: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { - beforeSrc: []*TaskMessage{t2, t3}, - beforeDst: []*TaskMessage{t1}, - afterSrc: []*TaskMessage{}, - afterDst: []*TaskMessage{t1, t2, t3}, + inProgress: []*TaskMessage{t2, t3}, + enqueued: []*TaskMessage{t1}, + wantInProgress: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, }, } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Error(err) - continue - } - // seed src list. - for _, msg := range tc.beforeSrc { - r.client.LPush(inProgressQ, mustMarshal(t, msg)) - } - // seed dst list. - for _, msg := range tc.beforeDst { - r.client.LPush(defaultQ, mustMarshal(t, msg)) - } + flushDB(t, r) // clean up db before each test case + seedInProgressQueue(t, r, tc.inProgress) + seedDefaultQueue(t, r, tc.enqueued) if err := r.RestoreUnfinished(); err != nil { t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err) continue } - src := r.client.LRange(inProgressQ, 0, -1).Val() - gotSrc := mustUnmarshalSlice(t, src) - if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" { + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgressQ, diff) } - dst := r.client.LRange(defaultQ, 0, -1).Val() - gotDst := mustUnmarshalSlice(t, dst) - if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" { + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQ, diff) } } @@ -328,38 +212,38 @@ func TestCheckAndEnqueue(t *testing.T) { hourFromNow := time.Now().Add(time.Hour) tests := []struct { - initScheduled []*redis.Z // tasks to be processed later - initRetry []*redis.Z // tasks to be retired later - wantQueued []*TaskMessage // queue after calling forward - wantScheduled []*TaskMessage // tasks in scheduled queue after calling the method - wantRetry []*TaskMessage // tasks in retry queue after calling the method + scheduled []sortedSetEntry + retry []sortedSetEntry + wantQueued []*TaskMessage + wantScheduled []*TaskMessage + wantRetry []*TaskMessage }{ { - initScheduled: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())}, - &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - initRetry: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}}, + scheduled: []sortedSetEntry{ + {t1, secondAgo.Unix()}, + {t2, secondAgo.Unix()}}, + retry: []sortedSetEntry{ + {t3, secondAgo.Unix()}}, wantQueued: []*TaskMessage{t1, t2, t3}, wantScheduled: []*TaskMessage{}, wantRetry: []*TaskMessage{}, }, { - initScheduled: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, - &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - initRetry: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}}, + scheduled: []sortedSetEntry{ + {t1, hourFromNow.Unix()}, + {t2, secondAgo.Unix()}}, + retry: []sortedSetEntry{ + {t3, secondAgo.Unix()}}, wantQueued: []*TaskMessage{t2, t3}, wantScheduled: []*TaskMessage{t1}, wantRetry: []*TaskMessage{}, }, { - initScheduled: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, - &redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}}, - initRetry: []*redis.Z{ - &redis.Z{Member: mustMarshal(t, t3), Score: float64(hourFromNow.Unix())}}, + scheduled: []sortedSetEntry{ + {t1, hourFromNow.Unix()}, + {t2, hourFromNow.Unix()}}, + retry: []sortedSetEntry{ + {t3, hourFromNow.Unix()}}, wantQueued: []*TaskMessage{}, wantScheduled: []*TaskMessage{t1, t2}, wantRetry: []*TaskMessage{t3}, @@ -367,18 +251,9 @@ func TestCheckAndEnqueue(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - if err := r.client.ZAdd(scheduledQ, tc.initScheduled...).Err(); err != nil { - t.Error(err) - continue - } - if err := r.client.ZAdd(retryQ, tc.initRetry...).Err(); err != nil { - t.Error(err) - continue - } + flushDB(t, r) // clean up db before each test case + seedScheduledQueue(t, r, tc.scheduled) + seedRetryQueue(t, r, tc.retry) err := r.CheckAndEnqueue() if err != nil { @@ -388,12 +263,17 @@ func TestCheckAndEnqueue(t *testing.T) { queued := r.client.LRange(defaultQ, 0, -1).Val() gotQueued := mustUnmarshalSlice(t, queued) if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" { - t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQ, len(gotQueued), len(tc.wantQueued), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) } - scheduled := r.client.ZRangeByScore(scheduledQ, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() + scheduled := r.client.ZRange(scheduledQ, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, scheduled) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + } + retry := r.client.ZRange(retryQ, 0, -1).Val() + gotRetry := mustUnmarshalSlice(t, retry) + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) } } } @@ -411,29 +291,20 @@ func TestSchedule(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - - err := r.Schedule(tc.msg, tc.processAt) - if err != nil { - t.Error(err) - continue - } - - res, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result() - if err != nil { - t.Error(err) - continue - } + flushDB(t, r) // clean up db before each test case desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt) + err := r.Schedule(tc.msg, tc.processAt) + if err != nil { + t.Errorf("%s = %v, want nil", desc, err) + continue + } + + res := r.client.ZRangeWithScores(scheduledQ, 0, -1).Val() if len(res) != 1 { t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), scheduledQ) continue } - if res[0].Score != float64(tc.processAt.Unix()) { t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) continue @@ -454,29 +325,20 @@ func TestRetryLater(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - - err := r.RetryLater(tc.msg, tc.processAt) - if err != nil { - t.Error(err) - continue - } - - res, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result() - if err != nil { - t.Error(err) - continue - } + flushDB(t, r) // clean up db before each test case desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) + err := r.RetryLater(tc.msg, tc.processAt) + if err != nil { + t.Errorf("%s = %v, want nil", desc, err) + continue + } + + res := r.client.ZRangeWithScores(retryQ, 0, -1).Val() if len(res) != 1 { t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ) continue } - if res[0].Score != float64(tc.processAt.Unix()) { t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) continue