diff --git a/background.go b/background.go index 91635bf..e2e27eb 100644 --- a/background.go +++ b/background.go @@ -229,10 +229,13 @@ func (bg *Background) stop() { return } + // Note: The order of termination is important. + // Sender goroutines should be terminated before the receiver goroutines. + // + // processor -> syncer (via syncRequestCh) + // processor -> heartbeater (via workerCh) bg.scheduler.terminate() bg.processor.terminate() - // Note: processor and all worker goroutines need to be exited - // before shutting down syncer to avoid goroutine leak. bg.syncer.terminate() bg.subscriber.terminate() bg.heartbeater.terminate() @@ -240,7 +243,6 @@ func (bg *Background) stop() { bg.wg.Wait() bg.rdb.Close() - bg.processor.handler = nil bg.running = false logger.info("Bye!") diff --git a/subscriber_test.go b/subscriber_test.go index 1a67159..0bc9258 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -27,8 +27,11 @@ func TestSubscriber(t *testing.T) { } for _, tc := range tests { + var mu sync.Mutex called := false fakeCancelFunc := func() { + mu.Lock() + defer mu.Unlock() called = true } cancelations := base.NewCancelations() @@ -46,6 +49,7 @@ func TestSubscriber(t *testing.T) { // allow for redis to publish message time.Sleep(time.Second) + mu.Lock() if called != tc.wantCalled { if tc.wantCalled { t.Errorf("fakeCancelFunc was not called, want the function to be called") @@ -53,6 +57,7 @@ func TestSubscriber(t *testing.T) { t.Errorf("fakeCancelFunc was called, want the function to not be called") } } + mu.Unlock() subscriber.terminate() }