diff --git a/background.go b/background.go index 4d439ba..2d9ac6e 100644 --- a/background.go +++ b/background.go @@ -1,6 +1,7 @@ package asynq import ( + "fmt" "os" "os/signal" "sync" @@ -49,6 +50,7 @@ func (bg *Background) Run(handler TaskHandler) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt, os.Kill) <-sigs + fmt.Printf("\nStarting graceful shutdown...\n") } // starts the background-task processing. diff --git a/poller.go b/poller.go index 027124f..9fd0b86 100644 --- a/poller.go +++ b/poller.go @@ -29,6 +29,7 @@ func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller { } func (p *poller) terminate() { + fmt.Print("Poller shutting down...") // Signal the poller goroutine to stop polling. p.done <- struct{}{} } @@ -39,9 +40,7 @@ func (p *poller) start() { for { select { case <-p.done: - fmt.Println("-------------[Poller]---------------") - fmt.Println("Poller shutting down...") - fmt.Println("------------------------------------") + fmt.Println("Done") return default: p.exec() diff --git a/processor.go b/processor.go index bcfc161..f64737a 100644 --- a/processor.go +++ b/processor.go @@ -29,15 +29,16 @@ func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { } func (p *processor) terminate() { + fmt.Print("Processor shutting down...") // Signal the processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} - fmt.Println("--- Waiting for all workers to finish ---") + fmt.Print("Waiting for all workers to finish...") for i := 0; i < cap(p.sema); i++ { // block until all workers have released the token p.sema <- struct{}{} } - fmt.Println("--- All workers have finished! ----") + fmt.Println("Done") } func (p *processor) start() { @@ -48,9 +49,7 @@ func (p *processor) start() { for { select { case <-p.done: - fmt.Println("-------------[Processor]---------------") - fmt.Println("Processor shutting down...") - fmt.Println("-------------------------------------") + fmt.Println("Done") return default: p.exec()