mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Fix data race in subscriber test
This commit is contained in:
parent
39f237899b
commit
906f231e6c
@ -229,10 +229,13 @@ func (bg *Background) stop() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: The order of termination is important.
|
||||||
|
// Sender goroutines should be terminated before the receiver goroutines.
|
||||||
|
//
|
||||||
|
// processor -> syncer (via syncRequestCh)
|
||||||
|
// processor -> heartbeater (via workerCh)
|
||||||
bg.scheduler.terminate()
|
bg.scheduler.terminate()
|
||||||
bg.processor.terminate()
|
bg.processor.terminate()
|
||||||
// Note: processor and all worker goroutines need to be exited
|
|
||||||
// before shutting down syncer to avoid goroutine leak.
|
|
||||||
bg.syncer.terminate()
|
bg.syncer.terminate()
|
||||||
bg.subscriber.terminate()
|
bg.subscriber.terminate()
|
||||||
bg.heartbeater.terminate()
|
bg.heartbeater.terminate()
|
||||||
@ -240,7 +243,6 @@ func (bg *Background) stop() {
|
|||||||
bg.wg.Wait()
|
bg.wg.Wait()
|
||||||
|
|
||||||
bg.rdb.Close()
|
bg.rdb.Close()
|
||||||
bg.processor.handler = nil
|
|
||||||
bg.running = false
|
bg.running = false
|
||||||
|
|
||||||
logger.info("Bye!")
|
logger.info("Bye!")
|
||||||
|
@ -27,8 +27,11 @@ func TestSubscriber(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
var mu sync.Mutex
|
||||||
called := false
|
called := false
|
||||||
fakeCancelFunc := func() {
|
fakeCancelFunc := func() {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
called = true
|
called = true
|
||||||
}
|
}
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
@ -46,6 +49,7 @@ func TestSubscriber(t *testing.T) {
|
|||||||
// allow for redis to publish message
|
// allow for redis to publish message
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
if called != tc.wantCalled {
|
if called != tc.wantCalled {
|
||||||
if tc.wantCalled {
|
if tc.wantCalled {
|
||||||
t.Errorf("fakeCancelFunc was not called, want the function to be called")
|
t.Errorf("fakeCancelFunc was not called, want the function to be called")
|
||||||
@ -53,6 +57,7 @@ func TestSubscriber(t *testing.T) {
|
|||||||
t.Errorf("fakeCancelFunc was called, want the function to not be called")
|
t.Errorf("fakeCancelFunc was called, want the function to not be called")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
subscriber.terminate()
|
subscriber.terminate()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user