From 207a6d2d1af361e8df840fd5268d8314e151586a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 18 Jan 2020 10:17:39 -0800 Subject: [PATCH] Fix benchmark tests --- asynq_test.go | 10 ++++++++-- background.go | 4 ++-- benchmark_test.go | 20 ++++++++++++++------ syncer.go | 1 + syncer_test.go | 6 +++--- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 8e16090..1b0e3eb 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -16,11 +16,17 @@ import ( // This file defines test helper functions used by // other test files. +// redis used for package testing. +const ( + redisAddr = "localhost:6379" + redisDB = 14 +) + func setup(tb testing.TB) *redis.Client { tb.Helper() r := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - DB: 14, + Addr: redisAddr, + DB: redisDB, }) // Start each test with a clean slate. h.FlushDB(tb, r) diff --git a/background.go b/background.go index 39fddcd..249c655 100644 --- a/background.go +++ b/background.go @@ -111,9 +111,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { qcfg := normalizeQueueCfg(queues) syncRequestCh := make(chan *syncRequest) - syncer := newSyncer(syncRequestCh, 5*time.Second) - rdb := rdb.NewRDB(createRedisClient(r)) scheduler := newScheduler(rdb, 5*time.Second, qcfg) processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh) @@ -196,6 +194,8 @@ func (bg *Background) stop() { bg.scheduler.terminate() bg.processor.terminate() + // Note: processor and all worker goroutines need to be exited + // before shutting down syncer to avoid goroutine leak. bg.syncer.terminate() bg.rdb.Close() diff --git a/benchmark_test.go b/benchmark_test.go index 27f5899..09617dd 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -18,9 +18,13 @@ func BenchmarkEndToEndSimple(b *testing.B) { const count = 100000 for n := 0; n < b.N; n++ { b.StopTimer() // begin setup - r := setup(b) - client := NewClient(r) - bg := NewBackground(r, &Config{ + setup(b) + redis := &RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + } + client := NewClient(redis) + bg := NewBackground(redis, &Config{ Concurrency: 10, RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second @@ -55,9 +59,13 @@ func BenchmarkEndToEnd(b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() // begin setup rand.Seed(time.Now().UnixNano()) - r := setup(b) - client := NewClient(r) - bg := NewBackground(r, &Config{ + setup(b) + redis := &RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + } + client := NewClient(redis) + bg := NewBackground(redis, &Config{ Concurrency: 10, RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second diff --git a/syncer.go b/syncer.go index 89cba7d..1ea49f4 100644 --- a/syncer.go +++ b/syncer.go @@ -30,6 +30,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { return &syncer{ requestsCh: requestsCh, done: make(chan struct{}), + interval: interval, } } diff --git a/syncer_test.go b/syncer_test.go index 8ce7240..aa333c7 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -39,7 +39,7 @@ func TestSyncer(t *testing.T) { } } - time.Sleep(interval) // ensure that syncer runs at least once + time.Sleep(2 * interval) // ensure that syncer runs at least once gotInProgress := h.GetInProgressMessages(t, r) if l := len(gotInProgress); l != 0 { @@ -78,7 +78,7 @@ func TestSyncerRetry(t *testing.T) { } } - time.Sleep(interval) // ensure that syncer runs at least once + time.Sleep(2 * interval) // ensure that syncer runs at least once // Sanity check to ensure that message was not successfully deleted // from in-progress list. @@ -90,7 +90,7 @@ func TestSyncerRetry(t *testing.T) { // simualate failover. rdbClient = rdb.NewRDB(goodClient) - time.Sleep(interval) // ensure that syncer runs at least once + time.Sleep(2 * interval) // ensure that syncer runs at least once gotInProgress = h.GetInProgressMessages(t, goodClient) if l := len(gotInProgress); l != 0 {