2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Close redis client after each test run

This commit is contained in:
Ken Hibino 2020-09-08 06:51:01 -07:00
parent 450a9aa1e2
commit 69d7ec725a
13 changed files with 99 additions and 0 deletions

View File

@ -34,6 +34,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
client.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count) wg.Add(count)
@ -80,6 +81,7 @@ func BenchmarkEndToEnd(b *testing.B) {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
client.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count * 2) wg.Add(count * 2)
@ -151,6 +153,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
client.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(highCount + defaultCount + lowCount) wg.Add(highCount + defaultCount + lowCount)
@ -221,6 +224,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.StopTimer() // begin teardown b.StopTimer() // begin teardown
srv.Stop() srv.Stop()
client.Close()
b.StartTimer() // end teardown b.StartTimer() // end teardown
} }
} }

View File

@ -18,6 +18,7 @@ import (
func TestClientEnqueueWithProcessAtOption(t *testing.T) { func TestClientEnqueueWithProcessAtOption(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
@ -132,6 +133,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
func TestClientEnqueue(t *testing.T) { func TestClientEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
now := time.Now() now := time.Now()
@ -384,6 +386,7 @@ func TestClientEnqueue(t *testing.T) {
func TestClientEnqueueWithProcessInOption(t *testing.T) { func TestClientEnqueueWithProcessInOption(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
now := time.Now() now := time.Now()
@ -494,6 +497,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
func TestClientEnqueueError(t *testing.T) { func TestClientEnqueueError(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
@ -606,6 +610,7 @@ func TestClientDefaultOptions(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close()
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
gotRes, err := c.Enqueue(tc.task, tc.opts...) gotRes, err := c.Enqueue(tc.task, tc.opts...)
if err != nil { if err != nil {
@ -636,6 +641,7 @@ func TestClientDefaultOptions(t *testing.T) {
func TestClientEnqueueUnique(t *testing.T) { func TestClientEnqueueUnique(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close()
tests := []struct { tests := []struct {
task *Task task *Task
@ -678,6 +684,7 @@ func TestClientEnqueueUnique(t *testing.T) {
func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close()
tests := []struct { tests := []struct {
task *Task task *Task
@ -723,6 +730,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close()
tests := []struct { tests := []struct {
task *Task task *Task

View File

@ -15,6 +15,7 @@ import (
func TestHealthChecker(t *testing.T) { func TestHealthChecker(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
var ( var (
@ -62,6 +63,7 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) {
} }
}() }()
r := rdb.NewRDB(setup(t)) r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
var ( var (
// mu guards called and e variables. // mu guards called and e variables.

View File

@ -19,6 +19,7 @@ import (
func TestHeartbeater(t *testing.T) { func TestHeartbeater(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
tests := []struct { tests := []struct {
@ -128,6 +129,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
} }
}() }()
r := rdb.NewRDB(setup(t)) r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
hb := newHeartbeater(heartbeaterParams{ hb := newHeartbeater(heartbeaterParams{
logger: testLogger, logger: testLogger,

View File

@ -19,6 +19,7 @@ import (
func TestInspectorQueues(t *testing.T) { func TestInspectorQueues(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
tests := []struct { tests := []struct {
@ -51,6 +52,7 @@ func TestInspectorQueues(t *testing.T) {
func TestInspectorCurrentStats(t *testing.T) { func TestInspectorCurrentStats(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -162,6 +164,7 @@ func TestInspectorCurrentStats(t *testing.T) {
func TestInspectorHistory(t *testing.T) { func TestInspectorHistory(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now().UTC() now := time.Now().UTC()
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
@ -225,6 +228,7 @@ func createPendingTask(msg *base.TaskMessage) *PendingTask {
func TestInspectorListPendingTasks(t *testing.T) { func TestInspectorListPendingTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -293,6 +297,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
func TestInspectorListActiveTasks(t *testing.T) { func TestInspectorListActiveTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -358,6 +363,7 @@ func createScheduledTask(z base.Z) *ScheduledTask {
func TestInspectorListScheduledTasks(t *testing.T) { func TestInspectorListScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -433,6 +439,7 @@ func createRetryTask(z base.Z) *RetryTask {
func TestInspectorListRetryTasks(t *testing.T) { func TestInspectorListRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -509,6 +516,7 @@ func createDeadTask(z base.Z) *DeadTask {
func TestInspectorListDeadTasks(t *testing.T) { func TestInspectorListDeadTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -576,6 +584,7 @@ func TestInspectorListPagination(t *testing.T) {
asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil))
} }
r := setup(t) r := setup(t)
defer r.Close()
asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName)
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
@ -630,6 +639,7 @@ func TestInspectorListPagination(t *testing.T) {
func TestInspectorDeleteAllScheduledTasks(t *testing.T) { func TestInspectorDeleteAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -695,6 +705,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) {
func TestInspectorDeleteAllRetryTasks(t *testing.T) { func TestInspectorDeleteAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -760,6 +771,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) {
func TestInspectorDeleteAllDeadTasks(t *testing.T) { func TestInspectorDeleteAllDeadTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -825,6 +837,7 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) {
func TestInspectorKillAllScheduledTasks(t *testing.T) { func TestInspectorKillAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -957,6 +970,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) {
func TestInspectorKillAllRetryTasks(t *testing.T) { func TestInspectorKillAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
@ -1070,6 +1084,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
func TestInspectorRunAllScheduledTasks(t *testing.T) { func TestInspectorRunAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
@ -1186,6 +1201,7 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) {
func TestInspectorRunAllRetryTasks(t *testing.T) { func TestInspectorRunAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
@ -1302,6 +1318,7 @@ func TestInspectorRunAllRetryTasks(t *testing.T) {
func TestInspectorRunAllDeadTasks(t *testing.T) { func TestInspectorRunAllDeadTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
@ -1415,6 +1432,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) {
func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1464,6 +1482,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1513,6 +1532,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1562,6 +1582,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) {
func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil) m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1631,6 +1652,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1699,6 +1721,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
@ -1771,6 +1794,7 @@ func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) {
func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1841,6 +1865,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil) m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")

View File

@ -18,6 +18,7 @@ import (
func TestAllQueues(t *testing.T) { func TestAllQueues(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
tests := []struct { tests := []struct {
queues []string queues []string
@ -48,6 +49,7 @@ func TestAllQueues(t *testing.T) {
func TestCurrentStats(t *testing.T) { func TestCurrentStats(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
@ -216,6 +218,7 @@ func TestCurrentStats(t *testing.T) {
func TestCurrentStatsWithNonExistentQueue(t *testing.T) { func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
qname := "non-existent" qname := "non-existent"
got, err := r.CurrentStats(qname) got, err := r.CurrentStats(qname)
@ -226,6 +229,7 @@ func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
func TestHistoricalStats(t *testing.T) { func TestHistoricalStats(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now().UTC() now := time.Now().UTC()
tests := []struct { tests := []struct {
@ -281,6 +285,7 @@ func TestHistoricalStats(t *testing.T) {
func TestRedisInfo(t *testing.T) { func TestRedisInfo(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
info, err := r.RedisInfo() info, err := r.RedisInfo()
if err != nil { if err != nil {
@ -305,6 +310,7 @@ func TestRedisInfo(t *testing.T) {
func TestListPending(t *testing.T) { func TestListPending(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
@ -369,6 +375,7 @@ func TestListPending(t *testing.T) {
func TestListPendingPagination(t *testing.T) { func TestListPendingPagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
@ -435,6 +442,7 @@ func TestListPendingPagination(t *testing.T) {
func TestListActive(t *testing.T) { func TestListActive(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
@ -483,6 +491,7 @@ func TestListActive(t *testing.T) {
func TestListActivePagination(t *testing.T) { func TestListActivePagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
@ -539,6 +548,7 @@ func TestListActivePagination(t *testing.T) {
func TestListScheduled(t *testing.T) { func TestListScheduled(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("task3", nil) m3 := h.NewTaskMessage("task3", nil)
@ -616,6 +626,7 @@ func TestListScheduled(t *testing.T) {
func TestListScheduledPagination(t *testing.T) { func TestListScheduledPagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
// create 100 tasks with an increasing number of wait time. // create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
@ -673,6 +684,7 @@ func TestListScheduledPagination(t *testing.T) {
func TestListRetry(t *testing.T) { func TestListRetry(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "task1", Type: "task1",
@ -769,6 +781,7 @@ func TestListRetry(t *testing.T) {
func TestListRetryPagination(t *testing.T) { func TestListRetryPagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
// create 100 tasks with an increasing number of wait time. // create 100 tasks with an increasing number of wait time.
now := time.Now() now := time.Now()
var seed []base.Z var seed []base.Z
@ -830,6 +843,7 @@ func TestListRetryPagination(t *testing.T) {
func TestListDead(t *testing.T) { func TestListDead(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "task1", Type: "task1",
@ -920,6 +934,7 @@ func TestListDead(t *testing.T) {
func TestListDeadPagination(t *testing.T) { func TestListDeadPagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var entries []base.Z var entries []base.Z
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
@ -983,6 +998,7 @@ var (
func TestRunDeadTask(t *testing.T) { func TestRunDeadTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessageWithQueue("send_notification", nil, "critical") t3 := h.NewTaskMessageWithQueue("send_notification", nil, "critical")
@ -1087,6 +1103,7 @@ func TestRunDeadTask(t *testing.T) {
func TestRunRetryTask(t *testing.T) { func TestRunRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
@ -1191,6 +1208,7 @@ func TestRunRetryTask(t *testing.T) {
func TestRunScheduledTask(t *testing.T) { func TestRunScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessageWithQueue("send_notification", nil, "notifications") t3 := h.NewTaskMessageWithQueue("send_notification", nil, "notifications")
@ -1295,6 +1313,7 @@ func TestRunScheduledTask(t *testing.T) {
func TestRunAllScheduledTasks(t *testing.T) { func TestRunAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil) t3 := h.NewTaskMessage("reindex", nil)
@ -1400,6 +1419,7 @@ func TestRunAllScheduledTasks(t *testing.T) {
func TestRunAllRetryTasks(t *testing.T) { func TestRunAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil) t3 := h.NewTaskMessage("reindex", nil)
@ -1505,6 +1525,7 @@ func TestRunAllRetryTasks(t *testing.T) {
func TestRunAllDeadTasks(t *testing.T) { func TestRunAllDeadTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil) t3 := h.NewTaskMessage("reindex", nil)
@ -1610,6 +1631,7 @@ func TestRunAllDeadTasks(t *testing.T) {
func TestKillRetryTask(t *testing.T) { func TestKillRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1735,6 +1757,7 @@ func TestKillRetryTask(t *testing.T) {
func TestKillScheduledTask(t *testing.T) { func TestKillScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -1860,6 +1883,7 @@ func TestKillScheduledTask(t *testing.T) {
func TestKillAllRetryTasks(t *testing.T) { func TestKillAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2006,6 +2030,7 @@ func TestKillAllRetryTasks(t *testing.T) {
func TestKillAllScheduledTasks(t *testing.T) { func TestKillAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2152,6 +2177,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
func TestDeleteDeadTask(t *testing.T) { func TestDeleteDeadTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2251,6 +2277,7 @@ func TestDeleteDeadTask(t *testing.T) {
func TestDeleteRetryTask(t *testing.T) { func TestDeleteRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2335,6 +2362,7 @@ func TestDeleteRetryTask(t *testing.T) {
func TestDeleteScheduledTask(t *testing.T) { func TestDeleteScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2419,6 +2447,7 @@ func TestDeleteScheduledTask(t *testing.T) {
func TestDeleteAllDeadTasks(t *testing.T) { func TestDeleteAllDeadTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2480,6 +2509,7 @@ func TestDeleteAllDeadTasks(t *testing.T) {
func TestDeleteAllRetryTasks(t *testing.T) { func TestDeleteAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2541,6 +2571,7 @@ func TestDeleteAllRetryTasks(t *testing.T) {
func TestDeleteAllScheduledTasks(t *testing.T) { func TestDeleteAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2602,6 +2633,7 @@ func TestDeleteAllScheduledTasks(t *testing.T) {
func TestRemoveQueue(t *testing.T) { func TestRemoveQueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2702,6 +2734,7 @@ func TestRemoveQueue(t *testing.T) {
func TestRemoveQueueError(t *testing.T) { func TestRemoveQueueError(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
@ -2845,6 +2878,7 @@ func TestRemoveQueueError(t *testing.T) {
func TestListServers(t *testing.T) { func TestListServers(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
started1 := time.Now().Add(-time.Hour) started1 := time.Now().Add(-time.Hour)
info1 := &base.ServerInfo{ info1 := &base.ServerInfo{
@ -2906,6 +2940,7 @@ func TestListServers(t *testing.T) {
func TestListWorkers(t *testing.T) { func TestListWorkers(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var ( var (
host = "127.0.0.1" host = "127.0.0.1"

View File

@ -60,6 +60,7 @@ func setup(t *testing.T) (r *RDB) {
func TestEnqueue(t *testing.T) { func TestEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})
t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low") t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
@ -96,6 +97,7 @@ func TestEnqueue(t *testing.T) {
func TestEnqueueUnique(t *testing.T) { func TestEnqueueUnique(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
@ -140,6 +142,7 @@ func TestEnqueueUnique(t *testing.T) {
func TestDequeue(t *testing.T) { func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now() now := time.Now()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
@ -336,6 +339,7 @@ func TestDequeue(t *testing.T) {
func TestDequeueIgnoresPausedQueues(t *testing.T) { func TestDequeueIgnoresPausedQueues(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
@ -448,6 +452,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
func TestDone(t *testing.T) { func TestDone(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now() now := time.Now()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
@ -600,6 +605,7 @@ func TestDone(t *testing.T) {
func TestRequeue(t *testing.T) { func TestRequeue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now() now := time.Now()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
@ -748,6 +754,7 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
tests := []struct { tests := []struct {
msg *base.TaskMessage msg *base.TaskMessage
@ -785,6 +792,7 @@ func TestSchedule(t *testing.T) {
func TestScheduleUnique(t *testing.T) { func TestScheduleUnique(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
@ -841,6 +849,7 @@ func TestScheduleUnique(t *testing.T) {
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now() now := time.Now()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
@ -1001,6 +1010,7 @@ func TestRetry(t *testing.T) {
func TestKill(t *testing.T) { func TestKill(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
now := time.Now() now := time.Now()
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
@ -1203,6 +1213,7 @@ func TestKill(t *testing.T) {
func TestCheckAndEnqueue(t *testing.T) { func TestCheckAndEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("generate_csv", nil) t2 := h.NewTaskMessage("generate_csv", nil)
t3 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil)
@ -1414,6 +1425,7 @@ func TestListDeadlineExceeded(t *testing.T) {
} }
r := setup(t) r := setup(t)
defer r.Close()
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
@ -1433,6 +1445,7 @@ func TestListDeadlineExceeded(t *testing.T) {
func TestWriteServerState(t *testing.T) { func TestWriteServerState(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var ( var (
host = "localhost" host = "localhost"
@ -1499,6 +1512,7 @@ func TestWriteServerState(t *testing.T) {
func TestWriteServerStateWithWorkers(t *testing.T) { func TestWriteServerStateWithWorkers(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var ( var (
host = "127.0.0.1" host = "127.0.0.1"
@ -1607,6 +1621,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
func TestClearServerState(t *testing.T) { func TestClearServerState(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
var ( var (
host = "127.0.0.1" host = "127.0.0.1"
@ -1707,6 +1722,7 @@ func TestClearServerState(t *testing.T) {
func TestCancelationPubSub(t *testing.T) { func TestCancelationPubSub(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
pubsub, err := r.CancelationPubSub() pubsub, err := r.CancelationPubSub()
if err != nil { if err != nil {

View File

@ -484,6 +484,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
t6 = NewTask(m6.Type, m6.Payload) t6 = NewTask(m6.Type, m6.Payload)
t7 = NewTask(m7.Type, m7.Payload) t7 = NewTask(m7.Type, m7.Payload)
) )
defer r.Close()
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage // initial queues state pending map[string][]*base.TaskMessage // initial queues state

View File

@ -17,6 +17,7 @@ import (
func TestRecoverer(t *testing.T) { func TestRecoverer(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
t1 := h.NewTaskMessageWithQueue("task1", nil, "default") t1 := h.NewTaskMessageWithQueue("task1", nil, "default")

View File

@ -17,6 +17,7 @@ import (
func TestScheduler(t *testing.T) { func TestScheduler(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second const pollInterval = time.Second
s := newScheduler(schedulerParams{ s := newScheduler(schedulerParams{

View File

@ -23,6 +23,7 @@ func TestServer(t *testing.T) {
redisConnOpt := getRedisConnOpt(t) redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt) c := NewClient(redisConnOpt)
defer c.Close()
srv := NewServer(redisConnOpt, Config{ srv := NewServer(redisConnOpt, Config{
Concurrency: 10, Concurrency: 10,
LogLevel: testLogLevel, LogLevel: testLogLevel,

View File

@ -16,6 +16,7 @@ import (
func TestSubscriber(t *testing.T) { func TestSubscriber(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
tests := []struct { tests := []struct {
@ -76,6 +77,7 @@ func TestSubscriberWithRedisDown(t *testing.T) {
} }
}() }()
r := rdb.NewRDB(setup(t)) r := rdb.NewRDB(setup(t))
defer r.Close()
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()

View File

@ -22,6 +22,7 @@ func TestSyncer(t *testing.T) {
h.NewTaskMessage("gen_thumbnail", nil), h.NewTaskMessage("gen_thumbnail", nil),
} }
r := setup(t) r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
h.SeedActiveQueue(t, r, inProgress, base.DefaultQueueName) h.SeedActiveQueue(t, r, inProgress, base.DefaultQueueName)