From 5161b9368ae81da57b97b8000426e33b8345a9a4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 1 May 2020 07:22:11 -0700 Subject: [PATCH] Clean up tests --- internal/base/base_test.go | 6 ++++-- processor_test.go | 28 +++++++++------------------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 7d59059..65c223f 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -82,7 +82,8 @@ func TestServerInfoKey(t *testing.T) { for _, tc := range tests { got := ServerInfoKey(tc.hostname, tc.pid, tc.sid) if got != tc.want { - t.Errorf("ServerInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) + t.Errorf("ServerInfoKey(%q, %d, %q) = %q, want %q", + tc.hostname, tc.pid, tc.sid, got, tc.want) } } } @@ -101,7 +102,8 @@ func TestWorkersKey(t *testing.T) { for _, tc := range tests { got := WorkersKey(tc.hostname, tc.pid, tc.sid) if got != tc.want { - t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want) + t.Errorf("WorkersKey(%q, %d, %q) = %q, want = %q", + tc.hostname, tc.pid, tc.sid, got, tc.want) } } } diff --git a/processor_test.go b/processor_test.go index c379e8b..87ac520 100644 --- a/processor_test.go +++ b/processor_test.go @@ -37,19 +37,16 @@ func TestProcessorSuccess(t *testing.T) { tests := []struct { enqueued []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run - wait time.Duration // wait duration between starting and stopping processor for this test case wantProcessed []*Task // tasks to be processed at the end }{ { enqueued: []*base.TaskMessage{m1}, incoming: []*base.TaskMessage{m2, m3, m4}, - wait: time.Second, wantProcessed: []*Task{t1, t2, t3, t4}, }, { enqueued: []*base.TaskMessage{}, incoming: []*base.TaskMessage{m1}, - wait: time.Second, wantProcessed: []*Task{t1}, }, } @@ -68,21 +65,19 @@ func TestProcessorSuccess(t *testing.T) { return nil } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) - cancelations := base.NewCancelations() p := newProcessor(newProcessorParams{ logger: testLogger, broker: rdbClient, ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, - cancelations: cancelations, + cancelations: base.NewCancelations(), errHandler: nil, shutdownTimeout: defaultShutdownTimeout, }) p.handler = HandlerFunc(handler) - var wg sync.WaitGroup - p.start(&wg) + p.start(&sync.WaitGroup{}) for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { @@ -90,7 +85,7 @@ func TestProcessorSuccess(t *testing.T) { t.Fatal(err) } } - time.Sleep(tc.wait) + time.Sleep(time.Second) // wait for one second to allow all enqueued tasks to be processed. p.terminate() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { @@ -175,21 +170,19 @@ func TestProcessorRetry(t *testing.T) { n++ } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) - cancelations := base.NewCancelations() p := newProcessor(newProcessorParams{ logger: testLogger, broker: rdbClient, ss: ss, retryDelayFunc: delayFunc, syncCh: nil, - cancelations: cancelations, + cancelations: base.NewCancelations(), errHandler: ErrorHandlerFunc(errHandler), shutdownTimeout: defaultShutdownTimeout, }) p.handler = tc.handler - var wg sync.WaitGroup - p.start(&wg) + p.start(&sync.WaitGroup{}) for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { @@ -200,7 +193,7 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score + cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score gotRetry := h.GetRetryEntries(t, r) if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) @@ -249,7 +242,6 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - cancelations := base.NewCancelations() ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) p := newProcessor(newProcessorParams{ logger: testLogger, @@ -257,7 +249,7 @@ func TestProcessorQueues(t *testing.T) { ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, - cancelations: cancelations, + cancelations: base.NewCancelations(), errHandler: nil, shutdownTimeout: defaultShutdownTimeout, }) @@ -326,7 +318,6 @@ func TestProcessorWithStrictPriority(t *testing.T) { "low": 1, } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. - cancelations := base.NewCancelations() ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) p := newProcessor(newProcessorParams{ logger: testLogger, @@ -334,14 +325,13 @@ func TestProcessorWithStrictPriority(t *testing.T) { ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, - cancelations: cancelations, + cancelations: base.NewCancelations(), errHandler: nil, shutdownTimeout: defaultShutdownTimeout, }) p.handler = HandlerFunc(handler) - var wg sync.WaitGroup - p.start(&wg) + p.start(&sync.WaitGroup{}) time.Sleep(tc.wait) p.terminate()