diff --git a/asynq_test.go b/asynq_test.go index cc56fa9..95c0dda 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -1,32 +1,17 @@ package asynq import ( - "encoding/json" - "math/rand" "sort" "testing" - "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hibiken/asynq/internal/base" - "github.com/rs/xid" + h "github.com/hibiken/asynq/internal/asynqtest" ) // This file defines test helper functions used by // other test files. -func init() { - rand.Seed(time.Now().UnixNano()) -} - -// scheduledEntry represents an item in redis sorted set (aka ZSET). -type sortedSetEntry struct { - msg *base.TaskMessage - score int64 -} - func setup(t *testing.T) *redis.Client { t.Helper() r := redis.NewClient(&redis.Options{ @@ -34,9 +19,7 @@ func setup(t *testing.T) *redis.Client { DB: 14, }) // Start each test with a clean slate. - if err := r.FlushDB().Err(); err != nil { - panic(err) - } + h.FlushDB(t, r) return r } @@ -47,68 +30,3 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { }) return out }) - -var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*base.TaskMessage) []*base.TaskMessage { - out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out -}) - -var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { - out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].msg.ID.String() < out[j].msg.ID.String() - }) - return out -}) - -var ignoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") - -func randomTask(taskType, qname string, payload map[string]interface{}) *base.TaskMessage { - return &base.TaskMessage{ - ID: xid.New(), - Type: taskType, - Queue: qname, - Retry: defaultMaxRetry, - Payload: make(map[string]interface{}), - } -} - -func mustMarshal(t *testing.T, task *base.TaskMessage) string { - t.Helper() - data, err := json.Marshal(task) - if err != nil { - t.Fatal(err) - } - return string(data) -} - -func mustUnmarshal(t *testing.T, data string) *base.TaskMessage { - t.Helper() - var task base.TaskMessage - err := json.Unmarshal([]byte(data), &task) - if err != nil { - t.Fatal(err) - } - return &task -} - -func mustMarshalSlice(t *testing.T, tasks []*base.TaskMessage) []string { - t.Helper() - var data []string - for _, task := range tasks { - data = append(data, mustMarshal(t, task)) - } - return data -} - -func mustUnmarshalSlice(t *testing.T, data []string) []*base.TaskMessage { - t.Helper() - var tasks []*base.TaskMessage - for _, s := range data { - tasks = append(tasks, mustUnmarshal(t, s)) - } - return tasks -} diff --git a/client_test.go b/client_test.go index ca9f27d..1c861f0 100644 --- a/client_test.go +++ b/client_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -21,7 +22,7 @@ func TestClient(t *testing.T) { processAt time.Time opts []Option wantEnqueued []*base.TaskMessage - wantScheduled []sortedSetEntry + wantScheduled []h.ZSetEntry }{ { desc: "Process task immediately", @@ -44,15 +45,15 @@ func TestClient(t *testing.T) { processAt: time.Now().Add(2 * time.Hour), opts: []Option{}, wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil - wantScheduled: []sortedSetEntry{ + wantScheduled: []h.ZSetEntry{ { - msg: &base.TaskMessage{ + Msg: &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: defaultMaxRetry, Queue: "default", }, - score: time.Now().Add(2 * time.Hour).Unix(), + Score: time.Now().Add(2 * time.Hour).Unix(), }, }, }, @@ -111,10 +112,7 @@ func TestClient(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } + h.FlushDB(t, r) // clean up db before each test case. err := client.Process(tc.task, tc.processAt, tc.opts...) if err != nil { @@ -122,23 +120,13 @@ func TestClient(t *testing.T) { continue } - gotEnqueuedRaw := r.LRange(base.DefaultQueue, 0, -1).Val() - gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) - if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" { + gotEnqueued := h.GetEnqueuedMessages(t, r) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.IgnoreIDOpt); diff != "" { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.DefaultQueue, diff) } - gotScheduledRaw := r.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val() - var gotScheduled []sortedSetEntry - for _, z := range gotScheduledRaw { - gotScheduled = append(gotScheduled, sortedSetEntry{ - msg: mustUnmarshal(t, z.Member.(string)), - score: int64(z.Score), - }) - } - - cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff) } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index bec0ccc..ada2fd4 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -8,6 +8,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" ) @@ -36,6 +37,9 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [ return out }) +// IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing. +var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") + // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { return &base.TaskMessage{ diff --git a/processor_test.go b/processor_test.go index 693a719..03c9234 100644 --- a/processor_test.go +++ b/processor_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -15,10 +16,10 @@ func TestProcessorSuccess(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("gen_thumbnail", "default", nil) - m3 := randomTask("reindex", "default", nil) - m4 := randomTask("sync", "default", nil) + m1 := h.NewTaskMessage("send_email", nil) + m2 := h.NewTaskMessage("gen_thumbnail", nil) + m3 := h.NewTaskMessage("reindex", nil) + m4 := h.NewTaskMessage("sync", nil) t1 := &Task{Type: m1.Type, Payload: m1.Payload} t2 := &Task{Type: m2.Type, Payload: m2.Payload} @@ -26,19 +27,19 @@ func TestProcessorSuccess(t *testing.T) { t4 := &Task{Type: m4.Type, Payload: m4.Payload} tests := []struct { - initQueue []*base.TaskMessage // initial default queue state + enqueued []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run wait time.Duration // wait duration between starting and stopping processor for this test case wantProcessed []*Task // tasks to be processed at the end }{ { - initQueue: []*base.TaskMessage{m1}, + enqueued: []*base.TaskMessage{m1}, incoming: []*base.TaskMessage{m2, m3, m4}, wait: time.Second, wantProcessed: []*Task{t1, t2, t3, t4}, }, { - initQueue: []*base.TaskMessage{}, + enqueued: []*base.TaskMessage{}, incoming: []*base.TaskMessage{m1}, wait: time.Second, wantProcessed: []*Task{t1}, @@ -46,32 +47,22 @@ func TestProcessorSuccess(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } + h.FlushDB(t, r) // clean up db before each test case. + h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. + // instantiate a new processor var mu sync.Mutex var processed []*Task - var h HandlerFunc - h = func(task *Task) error { + handler := func(task *Task) error { mu.Lock() defer mu.Unlock() processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, h) + p := newProcessor(rdbClient, 10, HandlerFunc(handler)) p.dequeueTimeout = time.Second // short time out for test purpose - // initialize default queue. - for _, msg := range tc.initQueue { - err := rdbClient.Enqueue(msg) - if err != nil { - t.Fatal(err) - } - } p.start() - for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { @@ -96,11 +87,11 @@ func TestProcessorRetry(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := randomTask("send_email", "default", nil) + m1 := h.NewTaskMessage("send_email", nil) m1.Retried = m1.Retry // m1 has reached its max retry count - m2 := randomTask("gen_thumbnail", "default", nil) - m3 := randomTask("reindex", "default", nil) - m4 := randomTask("sync", "default", nil) + m2 := h.NewTaskMessage("gen_thumbnail", nil) + m3 := h.NewTaskMessage("reindex", nil) + m4 := h.NewTaskMessage("sync", nil) errMsg := "something went wrong" // r* is m* after retry @@ -117,14 +108,14 @@ func TestProcessorRetry(t *testing.T) { r4.Retried = m4.Retried + 1 tests := []struct { - initQueue []*base.TaskMessage // initial default queue state + enqueued []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run wait time.Duration // wait duration between starting and stopping processor for this test case wantRetry []*base.TaskMessage // tasks in retry queue at the end wantDead []*base.TaskMessage // tasks in dead queue at the end }{ { - initQueue: []*base.TaskMessage{m1, m2}, + enqueued: []*base.TaskMessage{m1, m2}, incoming: []*base.TaskMessage{m3, m4}, wait: time.Second, wantRetry: []*base.TaskMessage{&r2, &r3, &r4}, @@ -133,24 +124,15 @@ func TestProcessorRetry(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } + h.FlushDB(t, r) // clean up db before each test case. + h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. + // instantiate a new processor - var h HandlerFunc - h = func(task *Task) error { + handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, h) + p := newProcessor(rdbClient, 10, HandlerFunc(handler)) p.dequeueTimeout = time.Second // short time out for test purpose - // initialize default queue. - for _, msg := range tc.initQueue { - err := rdbClient.Enqueue(msg) - if err != nil { - t.Fatal(err) - } - } p.start() for _, msg := range tc.incoming { @@ -163,15 +145,13 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotRetryRaw := r.ZRange(base.RetryQueue, 0, -1).Val() - gotRetry := mustUnmarshalSlice(t, gotRetryRaw) - if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { + gotRetry := h.GetRetryMessages(t, r) + if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) } - gotDeadRaw := r.ZRange(base.DeadQueue, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, gotDeadRaw) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + gotDead := h.GetDeadMessages(t, r) + if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadQueue, diff) } diff --git a/scheduler_test.go b/scheduler_test.go index a8de200..aaffeb0 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -4,29 +4,26 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) func TestScheduler(t *testing.T) { - type scheduledTask struct { - msg *base.TaskMessage - processAt time.Time - } r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second s := newScheduler(rdbClient, pollInterval) - t1 := randomTask("gen_thumbnail", "default", nil) - t2 := randomTask("send_email", "default", nil) - t3 := randomTask("reindex", "default", nil) - t4 := randomTask("sync", "default", nil) + t1 := h.NewTaskMessage("gen_thumbnail", nil) + t2 := h.NewTaskMessage("send_email", nil) + t3 := h.NewTaskMessage("reindex", nil) + t4 := h.NewTaskMessage("sync", nil) + now := time.Now() tests := []struct { - initScheduled []scheduledTask // scheduled queue initial state - initRetry []scheduledTask // retry queue initial state + initScheduled []h.ZSetEntry // scheduled queue initial state + initRetry []h.ZSetEntry // retry queue initial state initQueue []*base.TaskMessage // default queue initial state wait time.Duration // wait duration before checking for final state wantScheduled []*base.TaskMessage // schedule queue final state @@ -34,12 +31,12 @@ func TestScheduler(t *testing.T) { wantQueue []*base.TaskMessage // default queue final state }{ { - initScheduled: []scheduledTask{ - {t1, time.Now().Add(time.Hour)}, - {t2, time.Now().Add(-2 * time.Second)}, + initScheduled: []h.ZSetEntry{ + {Msg: t1, Score: now.Add(time.Hour).Unix()}, + {Msg: t2, Score: now.Add(-2 * time.Second).Unix()}, }, - initRetry: []scheduledTask{ - {t3, time.Now().Add(-500 * time.Millisecond)}, + initRetry: []h.ZSetEntry{ + {Msg: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}, }, initQueue: []*base.TaskMessage{t4}, wait: pollInterval * 2, @@ -48,12 +45,12 @@ func TestScheduler(t *testing.T) { wantQueue: []*base.TaskMessage{t2, t3, t4}, }, { - initScheduled: []scheduledTask{ - {t1, time.Now()}, - {t2, time.Now().Add(-2 * time.Second)}, - {t3, time.Now().Add(-500 * time.Millisecond)}, + initScheduled: []h.ZSetEntry{ + {Msg: t1, Score: now.Unix()}, + {Msg: t2, Score: now.Add(-2 * time.Second).Unix()}, + {Msg: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, }, - initRetry: []scheduledTask{}, + initRetry: []h.ZSetEntry{}, initQueue: []*base.TaskMessage{t4}, wait: pollInterval * 2, wantScheduled: []*base.TaskMessage{}, @@ -63,54 +60,27 @@ func TestScheduler(t *testing.T) { } for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } - // initialize scheduled queue - for _, st := range tc.initScheduled { - err := rdbClient.Schedule(st.msg, st.processAt) - if err != nil { - t.Fatal(err) - } - } - // initialize retry queue - for _, st := range tc.initRetry { - err := r.ZAdd(base.RetryQueue, &redis.Z{ - Member: mustMarshal(t, st.msg), - Score: float64(st.processAt.Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } - // initialize default queue - for _, msg := range tc.initQueue { - err := rdbClient.Enqueue(msg) - if err != nil { - t.Fatal(err) - } - } + h.FlushDB(t, r) // clean up db before each test case. + h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue + h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue + h.SeedDefaultQueue(t, r, tc.initQueue) // initialize default queue s.start() time.Sleep(tc.wait) s.terminate() - gotScheduledRaw := r.ZRange(base.ScheduledQueue, 0, -1).Val() - gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { + gotScheduled := h.GetScheduledMessages(t, r) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledQueue, diff) } - gotRetryRaw := r.ZRange(base.RetryQueue, 0, -1).Val() - gotRetry := mustUnmarshalSlice(t, gotRetryRaw) - if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { + gotRetry := h.GetRetryMessages(t, r) + if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryQueue, diff) } - gotQueueRaw := r.LRange(base.DefaultQueue, 0, -1).Val() - gotQueue := mustUnmarshalSlice(t, gotQueueRaw) - if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { + gotEnqueued := h.GetEnqueuedMessages(t, r) + if diff := cmp.Diff(tc.wantQueue, gotEnqueued, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultQueue, diff) } }