diff --git a/scheduler.go b/scheduler.go index dcb0aa0..8b2ad9c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -41,6 +41,7 @@ type Scheduler struct { // to avoid using cron.EntryID as the public API of // the Scheduler. idmap map[string]cron.EntryID + sigs chan os.Signal } // NewScheduler returns a new Scheduler instance given the redis connection option. @@ -77,6 +78,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { done: make(chan struct{}), errHandler: opts.EnqueueErrorHandler, idmap: make(map[string]cron.EntryID), + sigs: make(chan os.Signal, 1), } } diff --git a/server.go b/server.go index c8bcbc2..743ab6b 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,7 @@ import ( "fmt" "math" "math/rand" + "os" "runtime" "strings" "sync" @@ -51,6 +52,7 @@ type Server struct { healthchecker *healthchecker janitor *janitor aggregator *aggregator + sigs chan os.Signal } type serverState struct { @@ -547,6 +549,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { healthchecker: healthchecker, janitor: janitor, aggregator: aggregator, + sigs: make(chan os.Signal, 1), } } diff --git a/signals_unix.go b/signals_unix.go index e2c767b..0146aa2 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -1,3 +1,4 @@ +//go:build linux || bsd || darwin // +build linux bsd darwin package asynq @@ -17,8 +18,7 @@ func (srv *Server) waitForSignals() { srv.logger.Info("Send signal TSTP to stop processing new tasks") srv.logger.Info("Send signal TERM or INT to terminate the process") - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) + signal.Notify(srv.sigs, os.Interrupt, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) for { sig := <-sigs if sig == unix.SIGTSTP { @@ -29,9 +29,18 @@ func (srv *Server) waitForSignals() { } } +// SignalShutdown stops and shuts down the server. +func (srv *Server) SignalShutdown() { + srv.sigs <- os.Interrupt +} + func (s *Scheduler) waitForSignals() { s.logger.Info("Send signal TERM or INT to stop the scheduler") - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) - <-sigs + signal.Notify(s.sigs, os.Interrupt, unix.SIGTERM, unix.SIGINT) + <-s.sigs +} + +// SignalShutdown stops and shuts down the scheduler. +func (s *Scheduler) SignalShutdown() { + s.sigs <- os.Interrupt } diff --git a/signals_windows.go b/signals_windows.go index da22e68..ebb37a3 100644 --- a/signals_windows.go +++ b/signals_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package asynq @@ -16,14 +17,22 @@ import ( // Note: Currently SIGTSTP is not supported for windows build. func (srv *Server) waitForSignals() { srv.logger.Info("Send signal TERM or INT to terminate the process") - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) - <-sigs + signal.Notify(srv.sigs, os.Interrupt, windows.SIGTERM, windows.SIGINT) + <-srv.sigs +} + +// SignalShutdown stops and shuts down the server. +func (srv *Server) SignalShutdown() { + srv.sigs <- os.Interrupt } func (s *Scheduler) waitForSignals() { s.logger.Info("Send signal TERM or INT to stop the scheduler") - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) - <-sigs + signal.Notify(s.sigs, os.Interrupt, windows.SIGTERM, windows.SIGINT) + <-s.sigs +} + +// SignalShutdown stops and shuts down the scheduler. +func (s *Scheduler) SignalShutdown() { + s.sigs <- os.Interrupt }