diff --git a/background.go b/background.go index 9649a2a..4571af1 100644 --- a/background.go +++ b/background.go @@ -80,9 +80,8 @@ func (bg *Background) Run(handler Handler) { signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) for { sig := <-sigs - fmt.Printf("[DEBUG] Got %v\n", sig) // TODO: Remove this if sig == syscall.SIGTSTP { - fmt.Println("[DEBUG] Stop processing tasks") + bg.processor.stop() continue } break diff --git a/processor.go b/processor.go index 8653401..d391257 100644 --- a/processor.go +++ b/processor.go @@ -5,6 +5,7 @@ import ( "log" "math" "math/rand" + "sync" "time" "github.com/hibiken/asynq/internal/rdb" @@ -25,7 +26,9 @@ type processor struct { sema chan struct{} // channel to communicate back to the long running "processor" goroutine. + // once is used to send value to the channel only once. done chan struct{} + once sync.Once // quit channel communicates to the in-flight worker goroutines to stop. quit chan struct{} @@ -42,11 +45,20 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { } } +// Note: stops only the "processor" goroutine, does not stop workers. +// It's safe to call this method multiple times. +func (p *processor) stop() { + p.once.Do(func() { + log.Println("[INFO] Processor shutting down...") + // Signal the processor goroutine to stop processing tasks + // from the queue. + p.done <- struct{}{} + }) +} + // NOTE: once terminated, processor cannot be re-started. func (p *processor) terminate() { - log.Println("[INFO] Processor shutting down...") - // Signal the processor goroutine to stop processing tasks from the queue. - p.done <- struct{}{} + p.stop() // TODO(hibiken): Allow user to customize this timeout value. const timeout = 8 * time.Second