diff --git a/manager.go b/manager.go index bb09062..8339549 100644 --- a/manager.go +++ b/manager.go @@ -18,6 +18,7 @@ type manager struct { // does not exceed the limit sema chan struct{} + // channel to communicate back to the long running "manager" goroutine. done chan struct{} } @@ -31,7 +32,16 @@ func newManager(rdb *redis.Client, numWorkers int, handler TaskHandler) *manager } func (m *manager) terminate() { + // send a signal to the manager goroutine to stop + // processing tasks from the queue. m.done <- struct{}{} + + fmt.Println("--- Waiting for all workers to finish ---") + for i := 0; i < cap(m.sema); i++ { + // block until all workers have released the token + m.sema <- struct{}{} + } + fmt.Println("--- All workers have finished! ----") } func (m *manager) start() { @@ -39,7 +49,10 @@ func (m *manager) start() { for { select { case <-m.done: - m.shutdown() + fmt.Println("-------------[Manager]---------------") + fmt.Println("Manager shutting down...") + fmt.Println("-------------------------------------") + return default: m.processTasks() } @@ -92,10 +105,3 @@ func (m *manager) processTasks() { } }(t) } - -func (m *manager) shutdown() { - // TODO(hibiken): implement this. Gracefully shutdown all active goroutines. - fmt.Println("-------------[Manager]---------------") - fmt.Println("Manager shutting down...") - fmt.Println("------------------------------------") -} diff --git a/poller.go b/poller.go index 723dff5..5f969ee 100644 --- a/poller.go +++ b/poller.go @@ -13,6 +13,7 @@ import ( type poller struct { rdb *redis.Client + // channel to communicate back to the long running "poller" goroutine. done chan struct{} // poll interval on average @@ -23,6 +24,8 @@ type poller struct { } func (p *poller) terminate() { + // send a signal to the manager goroutine to stop + // processing tasks from the queue. p.done <- struct{}{} } @@ -31,7 +34,10 @@ func (p *poller) start() { for { select { case <-p.done: - p.shutdown() + fmt.Println("-------------[Poller]---------------") + fmt.Println("Poller shutting down...") + fmt.Println("------------------------------------") + return default: p.enqueue() time.Sleep(p.avgInterval) @@ -80,10 +86,3 @@ func (p *poller) enqueue() { } } } - -func (p *poller) shutdown() { - // TODO(hibiken): implement this. Gracefully shutdown all active goroutines. - fmt.Println("-------------[Poller]---------------") - fmt.Println("Poller shutting down...") - fmt.Println("------------------------------------") -}