From 69d7ec725a565b7932073e4f7513640842a2f5f9 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 8 Sep 2020 06:51:01 -0700 Subject: [PATCH] Close redis client after each test run --- benchmark_test.go | 4 ++++ client_test.go | 8 ++++++++ healthcheck_test.go | 2 ++ heartbeat_test.go | 2 ++ inspector_test.go | 25 +++++++++++++++++++++++++ internal/rdb/inspect_test.go | 35 +++++++++++++++++++++++++++++++++++ internal/rdb/rdb_test.go | 16 ++++++++++++++++ processor_test.go | 1 + recoverer_test.go | 1 + scheduler_test.go | 1 + server_test.go | 1 + subscriber_test.go | 2 ++ syncer_test.go | 1 + 13 files changed, 99 insertions(+) diff --git a/benchmark_test.go b/benchmark_test.go index 6a68496..a4d7db2 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -34,6 +34,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { b.Fatalf("could not enqueue a task: %v", err) } } + client.Close() var wg sync.WaitGroup wg.Add(count) @@ -80,6 +81,7 @@ func BenchmarkEndToEnd(b *testing.B) { b.Fatalf("could not enqueue a task: %v", err) } } + client.Close() var wg sync.WaitGroup wg.Add(count * 2) @@ -151,6 +153,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { b.Fatalf("could not enqueue a task: %v", err) } } + client.Close() var wg sync.WaitGroup wg.Add(highCount + defaultCount + lowCount) @@ -221,6 +224,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.StopTimer() // begin teardown srv.Stop() + client.Close() b.StartTimer() // end teardown } } diff --git a/client_test.go b/client_test.go index 1ad9466..fc9429e 100644 --- a/client_test.go +++ b/client_test.go @@ -18,6 +18,7 @@ import ( func TestClientEnqueueWithProcessAtOption(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) + defer client.Close() task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) @@ -132,6 +133,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { func TestClientEnqueue(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) + defer client.Close() task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) now := time.Now() @@ -384,6 +386,7 @@ func TestClientEnqueue(t *testing.T) { func TestClientEnqueueWithProcessInOption(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) + defer client.Close() task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) now := time.Now() @@ -494,6 +497,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { func TestClientEnqueueError(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) + defer client.Close() task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) @@ -606,6 +610,7 @@ func TestClientDefaultOptions(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) + defer c.Close() c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) gotRes, err := c.Enqueue(tc.task, tc.opts...) if err != nil { @@ -636,6 +641,7 @@ func TestClientDefaultOptions(t *testing.T) { func TestClientEnqueueUnique(t *testing.T) { r := setup(t) c := NewClient(getRedisConnOpt(t)) + defer c.Close() tests := []struct { task *Task @@ -678,6 +684,7 @@ func TestClientEnqueueUnique(t *testing.T) { func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { r := setup(t) c := NewClient(getRedisConnOpt(t)) + defer c.Close() tests := []struct { task *Task @@ -723,6 +730,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { r := setup(t) c := NewClient(getRedisConnOpt(t)) + defer c.Close() tests := []struct { task *Task diff --git a/healthcheck_test.go b/healthcheck_test.go index c077271..4b4c15e 100644 --- a/healthcheck_test.go +++ b/healthcheck_test.go @@ -15,6 +15,7 @@ import ( func TestHealthChecker(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) var ( @@ -62,6 +63,7 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) { } }() r := rdb.NewRDB(setup(t)) + defer r.Close() testBroker := testbroker.NewTestBroker(r) var ( // mu guards called and e variables. diff --git a/heartbeat_test.go b/heartbeat_test.go index 6f9054b..16cc9bf 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -19,6 +19,7 @@ import ( func TestHeartbeater(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) tests := []struct { @@ -128,6 +129,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { } }() r := rdb.NewRDB(setup(t)) + defer r.Close() testBroker := testbroker.NewTestBroker(r) hb := newHeartbeater(heartbeaterParams{ logger: testLogger, diff --git a/inspector_test.go b/inspector_test.go index 1d4ad12..a7cb008 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -19,6 +19,7 @@ import ( func TestInspectorQueues(t *testing.T) { r := setup(t) + defer r.Close() inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { @@ -51,6 +52,7 @@ func TestInspectorQueues(t *testing.T) { func TestInspectorCurrentStats(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -162,6 +164,7 @@ func TestInspectorCurrentStats(t *testing.T) { func TestInspectorHistory(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now().UTC() inspector := NewInspector(getRedisConnOpt(t)) @@ -225,6 +228,7 @@ func createPendingTask(msg *base.TaskMessage) *PendingTask { func TestInspectorListPendingTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -293,6 +297,7 @@ func TestInspectorListPendingTasks(t *testing.T) { func TestInspectorListActiveTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -358,6 +363,7 @@ func createScheduledTask(z base.Z) *ScheduledTask { func TestInspectorListScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -433,6 +439,7 @@ func createRetryTask(z base.Z) *RetryTask { func TestInspectorListRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -509,6 +516,7 @@ func createDeadTask(z base.Z) *DeadTask { func TestInspectorListDeadTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -576,6 +584,7 @@ func TestInspectorListPagination(t *testing.T) { asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) } r := setup(t) + defer r.Close() asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) inspector := NewInspector(getRedisConnOpt(t)) @@ -630,6 +639,7 @@ func TestInspectorListPagination(t *testing.T) { func TestInspectorDeleteAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -695,6 +705,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { func TestInspectorDeleteAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -760,6 +771,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { func TestInspectorDeleteAllDeadTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -825,6 +837,7 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { func TestInspectorKillAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -957,6 +970,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { func TestInspectorKillAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) @@ -1070,6 +1084,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { func TestInspectorRunAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") @@ -1186,6 +1201,7 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) { func TestInspectorRunAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") @@ -1302,6 +1318,7 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { func TestInspectorRunAllDeadTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") @@ -1415,6 +1432,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1464,6 +1482,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1513,6 +1532,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1562,6 +1582,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1631,6 +1652,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1699,6 +1721,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") @@ -1771,6 +1794,7 @@ func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1841,6 +1865,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 3bd5a1c..c08845d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -18,6 +18,7 @@ import ( func TestAllQueues(t *testing.T) { r := setup(t) + defer r.Close() tests := []struct { queues []string @@ -48,6 +49,7 @@ func TestAllQueues(t *testing.T) { func TestCurrentStats(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) @@ -216,6 +218,7 @@ func TestCurrentStats(t *testing.T) { func TestCurrentStatsWithNonExistentQueue(t *testing.T) { r := setup(t) + defer r.Close() qname := "non-existent" got, err := r.CurrentStats(qname) @@ -226,6 +229,7 @@ func TestCurrentStatsWithNonExistentQueue(t *testing.T) { func TestHistoricalStats(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now().UTC() tests := []struct { @@ -281,6 +285,7 @@ func TestHistoricalStats(t *testing.T) { func TestRedisInfo(t *testing.T) { r := setup(t) + defer r.Close() info, err := r.RedisInfo() if err != nil { @@ -305,6 +310,7 @@ func TestRedisInfo(t *testing.T) { func TestListPending(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m2 := h.NewTaskMessage("reindex", nil) @@ -369,6 +375,7 @@ func TestListPending(t *testing.T) { func TestListPendingPagination(t *testing.T) { r := setup(t) + defer r.Close() var msgs []*base.TaskMessage for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) @@ -435,6 +442,7 @@ func TestListPendingPagination(t *testing.T) { func TestListActive(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -483,6 +491,7 @@ func TestListActive(t *testing.T) { func TestListActivePagination(t *testing.T) { r := setup(t) + defer r.Close() var msgs []*base.TaskMessage for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) @@ -539,6 +548,7 @@ func TestListActivePagination(t *testing.T) { func TestListScheduled(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessage("task3", nil) @@ -616,6 +626,7 @@ func TestListScheduled(t *testing.T) { func TestListScheduledPagination(t *testing.T) { r := setup(t) + defer r.Close() // create 100 tasks with an increasing number of wait time. for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) @@ -673,6 +684,7 @@ func TestListScheduledPagination(t *testing.T) { func TestListRetry(t *testing.T) { r := setup(t) + defer r.Close() m1 := &base.TaskMessage{ ID: uuid.New(), Type: "task1", @@ -769,6 +781,7 @@ func TestListRetry(t *testing.T) { func TestListRetryPagination(t *testing.T) { r := setup(t) + defer r.Close() // create 100 tasks with an increasing number of wait time. now := time.Now() var seed []base.Z @@ -830,6 +843,7 @@ func TestListRetryPagination(t *testing.T) { func TestListDead(t *testing.T) { r := setup(t) + defer r.Close() m1 := &base.TaskMessage{ ID: uuid.New(), Type: "task1", @@ -920,6 +934,7 @@ func TestListDead(t *testing.T) { func TestListDeadPagination(t *testing.T) { r := setup(t) + defer r.Close() var entries []base.Z for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) @@ -983,6 +998,7 @@ var ( func TestRunDeadTask(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessageWithQueue("send_notification", nil, "critical") @@ -1087,6 +1103,7 @@ func TestRunDeadTask(t *testing.T) { func TestRunRetryTask(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1191,6 +1208,7 @@ func TestRunRetryTask(t *testing.T) { func TestRunScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessageWithQueue("send_notification", nil, "notifications") @@ -1295,6 +1313,7 @@ func TestRunScheduledTask(t *testing.T) { func TestRunAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("reindex", nil) @@ -1400,6 +1419,7 @@ func TestRunAllScheduledTasks(t *testing.T) { func TestRunAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("reindex", nil) @@ -1505,6 +1525,7 @@ func TestRunAllRetryTasks(t *testing.T) { func TestRunAllDeadTasks(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("reindex", nil) @@ -1610,6 +1631,7 @@ func TestRunAllDeadTasks(t *testing.T) { func TestKillRetryTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1735,6 +1757,7 @@ func TestKillRetryTask(t *testing.T) { func TestKillScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -1860,6 +1883,7 @@ func TestKillScheduledTask(t *testing.T) { func TestKillAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2006,6 +2030,7 @@ func TestKillAllRetryTasks(t *testing.T) { func TestKillAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2152,6 +2177,7 @@ func TestKillAllScheduledTasks(t *testing.T) { func TestDeleteDeadTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2251,6 +2277,7 @@ func TestDeleteDeadTask(t *testing.T) { func TestDeleteRetryTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2335,6 +2362,7 @@ func TestDeleteRetryTask(t *testing.T) { func TestDeleteScheduledTask(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2419,6 +2447,7 @@ func TestDeleteScheduledTask(t *testing.T) { func TestDeleteAllDeadTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2480,6 +2509,7 @@ func TestDeleteAllDeadTasks(t *testing.T) { func TestDeleteAllRetryTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2541,6 +2571,7 @@ func TestDeleteAllRetryTasks(t *testing.T) { func TestDeleteAllScheduledTasks(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2602,6 +2633,7 @@ func TestDeleteAllScheduledTasks(t *testing.T) { func TestRemoveQueue(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2702,6 +2734,7 @@ func TestRemoveQueue(t *testing.T) { func TestRemoveQueueError(t *testing.T) { r := setup(t) + defer r.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") @@ -2845,6 +2878,7 @@ func TestRemoveQueueError(t *testing.T) { func TestListServers(t *testing.T) { r := setup(t) + defer r.Close() started1 := time.Now().Add(-time.Hour) info1 := &base.ServerInfo{ @@ -2906,6 +2940,7 @@ func TestListServers(t *testing.T) { func TestListWorkers(t *testing.T) { r := setup(t) + defer r.Close() var ( host = "127.0.0.1" diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5202f79..db777d8 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -60,6 +60,7 @@ func setup(t *testing.T) (r *RDB) { func TestEnqueue(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") t3 := h.NewTaskMessageWithQueue("sync", nil, "low") @@ -96,6 +97,7 @@ func TestEnqueue(t *testing.T) { func TestEnqueueUnique(t *testing.T) { r := setup(t) + defer r.Close() m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", @@ -140,6 +142,7 @@ func TestEnqueueUnique(t *testing.T) { func TestDequeue(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now() t1 := &base.TaskMessage{ ID: uuid.New(), @@ -336,6 +339,7 @@ func TestDequeue(t *testing.T) { func TestDequeueIgnoresPausedQueues(t *testing.T) { r := setup(t) + defer r.Close() t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", @@ -448,6 +452,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { func TestDone(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now() t1 := &base.TaskMessage{ ID: uuid.New(), @@ -600,6 +605,7 @@ func TestDone(t *testing.T) { func TestRequeue(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now() t1 := &base.TaskMessage{ ID: uuid.New(), @@ -748,6 +754,7 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) tests := []struct { msg *base.TaskMessage @@ -785,6 +792,7 @@ func TestSchedule(t *testing.T) { func TestScheduleUnique(t *testing.T) { r := setup(t) + defer r.Close() m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", @@ -841,6 +849,7 @@ func TestScheduleUnique(t *testing.T) { func TestRetry(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now() t1 := &base.TaskMessage{ ID: uuid.New(), @@ -1001,6 +1010,7 @@ func TestRetry(t *testing.T) { func TestKill(t *testing.T) { r := setup(t) + defer r.Close() now := time.Now() t1 := &base.TaskMessage{ ID: uuid.New(), @@ -1203,6 +1213,7 @@ func TestKill(t *testing.T) { func TestCheckAndEnqueue(t *testing.T) { r := setup(t) + defer r.Close() t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("generate_csv", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1414,6 +1425,7 @@ func TestListDeadlineExceeded(t *testing.T) { } r := setup(t) + defer r.Close() for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllDeadlines(t, r.client, tc.deadlines) @@ -1433,6 +1445,7 @@ func TestListDeadlineExceeded(t *testing.T) { func TestWriteServerState(t *testing.T) { r := setup(t) + defer r.Close() var ( host = "localhost" @@ -1499,6 +1512,7 @@ func TestWriteServerState(t *testing.T) { func TestWriteServerStateWithWorkers(t *testing.T) { r := setup(t) + defer r.Close() var ( host = "127.0.0.1" @@ -1607,6 +1621,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { func TestClearServerState(t *testing.T) { r := setup(t) + defer r.Close() var ( host = "127.0.0.1" @@ -1707,6 +1722,7 @@ func TestClearServerState(t *testing.T) { func TestCancelationPubSub(t *testing.T) { r := setup(t) + defer r.Close() pubsub, err := r.CancelationPubSub() if err != nil { diff --git a/processor_test.go b/processor_test.go index 268a62b..1c90b40 100644 --- a/processor_test.go +++ b/processor_test.go @@ -484,6 +484,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { t6 = NewTask(m6.Type, m6.Payload) t7 = NewTask(m7.Type, m7.Payload) ) + defer r.Close() tests := []struct { pending map[string][]*base.TaskMessage // initial queues state diff --git a/recoverer_test.go b/recoverer_test.go index c9773be..02aa5ad 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -17,6 +17,7 @@ import ( func TestRecoverer(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) t1 := h.NewTaskMessageWithQueue("task1", nil, "default") diff --git a/scheduler_test.go b/scheduler_test.go index f637da4..965d24c 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -17,6 +17,7 @@ import ( func TestScheduler(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) const pollInterval = time.Second s := newScheduler(schedulerParams{ diff --git a/server_test.go b/server_test.go index 4242c27..1b72570 100644 --- a/server_test.go +++ b/server_test.go @@ -23,6 +23,7 @@ func TestServer(t *testing.T) { redisConnOpt := getRedisConnOpt(t) c := NewClient(redisConnOpt) + defer c.Close() srv := NewServer(redisConnOpt, Config{ Concurrency: 10, LogLevel: testLogLevel, diff --git a/subscriber_test.go b/subscriber_test.go index 6c84dd7..709aca9 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -16,6 +16,7 @@ import ( func TestSubscriber(t *testing.T) { r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) tests := []struct { @@ -76,6 +77,7 @@ func TestSubscriberWithRedisDown(t *testing.T) { } }() r := rdb.NewRDB(setup(t)) + defer r.Close() testBroker := testbroker.NewTestBroker(r) cancelations := base.NewCancelations() diff --git a/syncer_test.go b/syncer_test.go index 6c67d12..f46f2db 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -22,6 +22,7 @@ func TestSyncer(t *testing.T) { h.NewTaskMessage("gen_thumbnail", nil), } r := setup(t) + defer r.Close() rdbClient := rdb.NewRDB(r) h.SeedActiveQueue(t, r, inProgress, base.DefaultQueueName)