mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-22 22:06:12 +08:00 
			
		
		
		
	Use sync.WaitGroup for shutdown
This commit is contained in:
		| @@ -37,6 +37,9 @@ type Background struct { | ||||
| 	// channel to send state updates. | ||||
| 	stateCh chan<- string | ||||
|  | ||||
| 	// wait group to wait for all goroutines to finish. | ||||
| 	wg sync.WaitGroup | ||||
|  | ||||
| 	rdb         *rdb.RDB | ||||
| 	scheduler   *scheduler | ||||
| 	processor   *processor | ||||
| @@ -211,11 +214,11 @@ func (bg *Background) start(handler Handler) { | ||||
| 	bg.running = true | ||||
| 	bg.processor.handler = handler | ||||
|  | ||||
| 	bg.heartbeater.start() | ||||
| 	bg.subscriber.start() | ||||
| 	bg.syncer.start() | ||||
| 	bg.scheduler.start() | ||||
| 	bg.processor.start() | ||||
| 	bg.heartbeater.start(&bg.wg) | ||||
| 	bg.subscriber.start(&bg.wg) | ||||
| 	bg.syncer.start(&bg.wg) | ||||
| 	bg.scheduler.start(&bg.wg) | ||||
| 	bg.processor.start(&bg.wg) | ||||
| } | ||||
|  | ||||
| // stops the background-task processing. | ||||
| @@ -234,6 +237,8 @@ func (bg *Background) stop() { | ||||
| 	bg.subscriber.terminate() | ||||
| 	bg.heartbeater.terminate() | ||||
|  | ||||
| 	bg.wg.Wait() | ||||
|  | ||||
| 	bg.rdb.Close() | ||||
| 	bg.processor.handler = nil | ||||
| 	bg.running = false | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| @@ -49,10 +50,12 @@ func (h *heartbeater) terminate() { | ||||
| 	h.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (h *heartbeater) start() { | ||||
| func (h *heartbeater) start(wg *sync.WaitGroup) { | ||||
| 	h.pinfo.Started = time.Now() | ||||
| 	h.pinfo.State = "running" | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		h.beat() | ||||
| 		timer := time.NewTimer(h.interval) | ||||
| 		for { | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| @@ -15,6 +16,7 @@ import ( | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| ) | ||||
|  | ||||
| // FIXME: Make this test better. | ||||
| func TestHeartbeater(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	rdbClient := rdb.NewRDB(r) | ||||
| @@ -46,7 +48,8 @@ func TestHeartbeater(t *testing.T) { | ||||
| 			Started:     time.Now(), | ||||
| 			State:       "running", | ||||
| 		} | ||||
| 		hb.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		hb.start(&wg) | ||||
|  | ||||
| 		// allow for heartbeater to write to redis | ||||
| 		time.Sleep(tc.interval * 2) | ||||
|   | ||||
| @@ -119,11 +119,13 @@ func (p *processor) terminate() { | ||||
| 	p.restore() // move any unfinished tasks back to the queue. | ||||
| } | ||||
|  | ||||
| func (p *processor) start() { | ||||
| func (p *processor) start(wg *sync.WaitGroup) { | ||||
| 	// NOTE: The call to "restore" needs to complete before starting | ||||
| 	// the processor goroutine. | ||||
| 	p.restore() | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-p.done: | ||||
|   | ||||
| @@ -72,7 +72,8 @@ func TestProcessorSuccess(t *testing.T) { | ||||
| 		p := newProcessor(rdbClient, defaultQueueConfig, false, 10, defaultDelayFunc, nil, workerCh, cancelations) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		p.start(&wg) | ||||
| 		for _, msg := range tc.incoming { | ||||
| 			err := rdbClient.Enqueue(msg) | ||||
| 			if err != nil { | ||||
| @@ -159,7 +160,8 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 		p := newProcessor(rdbClient, defaultQueueConfig, false, 10, delayFunc, nil, workerCh, cancelations) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		p.start(&wg) | ||||
| 		for _, msg := range tc.incoming { | ||||
| 			err := rdbClient.Enqueue(msg) | ||||
| 			if err != nil { | ||||
| @@ -290,7 +292,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { | ||||
| 			defaultDelayFunc, nil, workerCh, cancelations) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		p.start(&wg) | ||||
| 		time.Sleep(tc.wait) | ||||
| 		p.terminate() | ||||
| 		close(workerCh) | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| @@ -43,8 +44,10 @@ func (s *scheduler) terminate() { | ||||
| } | ||||
|  | ||||
| // start starts the "scheduler" goroutine. | ||||
| func (s *scheduler) start() { | ||||
| func (s *scheduler) start(wg *sync.WaitGroup) { | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-s.done: | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| @@ -69,7 +70,8 @@ func TestScheduler(t *testing.T) { | ||||
| 		h.SeedRetryQueue(t, r, tc.initRetry)         // initialize retry queue | ||||
| 		h.SeedEnqueuedQueue(t, r, tc.initQueue)      // initialize default queue | ||||
|  | ||||
| 		s.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		s.start(&wg) | ||||
| 		time.Sleep(tc.wait) | ||||
| 		s.terminate() | ||||
|  | ||||
|   | ||||
| @@ -5,6 +5,8 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| ) | ||||
| @@ -33,14 +35,16 @@ func (s *subscriber) terminate() { | ||||
| 	s.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (s *subscriber) start() { | ||||
| func (s *subscriber) start(wg *sync.WaitGroup) { | ||||
| 	pubsub, err := s.rdb.CancelationPubSub() | ||||
| 	cancelCh := pubsub.Channel() | ||||
| 	if err != nil { | ||||
| 		logger.error("cannot subscribe to cancelation channel: %v", err) | ||||
| 		return | ||||
| 	} | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-s.done: | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| @@ -34,7 +35,8 @@ func TestSubscriber(t *testing.T) { | ||||
| 		cancelations.Add(tc.registeredID, fakeCancelFunc) | ||||
|  | ||||
| 		subscriber := newSubscriber(rdbClient, cancelations) | ||||
| 		subscriber.start() | ||||
| 		var wg sync.WaitGroup | ||||
| 		subscriber.start(&wg) | ||||
|  | ||||
| 		if err := rdbClient.PublishCancelation(tc.publishID); err != nil { | ||||
| 			subscriber.terminate() | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| @@ -39,8 +40,10 @@ func (s *syncer) terminate() { | ||||
| 	s.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (s *syncer) start() { | ||||
| func (s *syncer) start(wg *sync.WaitGroup) { | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
| 		var requests []*syncRequest | ||||
| 		for { | ||||
| 			select { | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| @@ -27,7 +28,8 @@ func TestSyncer(t *testing.T) { | ||||
| 	const interval = time.Second | ||||
| 	syncRequestCh := make(chan *syncRequest) | ||||
| 	syncer := newSyncer(syncRequestCh, interval) | ||||
| 	syncer.start() | ||||
| 	var wg sync.WaitGroup | ||||
| 	syncer.start(&wg) | ||||
| 	defer syncer.terminate() | ||||
|  | ||||
| 	for _, msg := range inProgress { | ||||
| @@ -66,7 +68,8 @@ func TestSyncerRetry(t *testing.T) { | ||||
| 	const interval = time.Second | ||||
| 	syncRequestCh := make(chan *syncRequest) | ||||
| 	syncer := newSyncer(syncRequestCh, interval) | ||||
| 	syncer.start() | ||||
| 	var wg sync.WaitGroup | ||||
| 	syncer.start(&wg) | ||||
| 	defer syncer.terminate() | ||||
|  | ||||
| 	for _, msg := range inProgress { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user