2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 09:56:12 +08:00
Allow a server instance to be gracefully shutdown programmatically
This commit is contained in:
herb
2022-04-22 13:38:46 -07:00
parent 5c723f597e
commit d1e5e4f2f1
4 changed files with 34 additions and 11 deletions

View File

@@ -41,6 +41,7 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of // to avoid using cron.EntryID as the public API of
// the Scheduler. // the Scheduler.
idmap map[string]cron.EntryID idmap map[string]cron.EntryID
sigs chan os.Signal
} }
// NewScheduler returns a new Scheduler instance given the redis connection option. // NewScheduler returns a new Scheduler instance given the redis connection option.
@@ -77,6 +78,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
done: make(chan struct{}), done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler, errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID), idmap: make(map[string]cron.EntryID),
sigs: make(chan os.Signal, 1),
} }
} }

View File

@@ -10,6 +10,7 @@ import (
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
"os"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
@@ -51,6 +52,7 @@ type Server struct {
healthchecker *healthchecker healthchecker *healthchecker
janitor *janitor janitor *janitor
aggregator *aggregator aggregator *aggregator
sigs chan os.Signal
} }
type serverState struct { type serverState struct {
@@ -547,6 +549,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
healthchecker: healthchecker, healthchecker: healthchecker,
janitor: janitor, janitor: janitor,
aggregator: aggregator, aggregator: aggregator,
sigs: make(chan os.Signal, 1),
} }
} }

View File

@@ -1,3 +1,4 @@
//go:build linux || bsd || darwin
// +build linux bsd darwin // +build linux bsd darwin
package asynq package asynq
@@ -17,8 +18,7 @@ func (srv *Server) waitForSignals() {
srv.logger.Info("Send signal TSTP to stop processing new tasks") srv.logger.Info("Send signal TSTP to stop processing new tasks")
srv.logger.Info("Send signal TERM or INT to terminate the process") srv.logger.Info("Send signal TERM or INT to terminate the process")
sigs := make(chan os.Signal, 1) signal.Notify(srv.sigs, os.Interrupt, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for { for {
sig := <-sigs sig := <-sigs
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
@@ -29,9 +29,18 @@ func (srv *Server) waitForSignals() {
} }
} }
// SignalShutdown stops and shuts down the server.
func (srv *Server) SignalShutdown() {
srv.sigs <- os.Interrupt
}
func (s *Scheduler) waitForSignals() { func (s *Scheduler) waitForSignals() {
s.logger.Info("Send signal TERM or INT to stop the scheduler") s.logger.Info("Send signal TERM or INT to stop the scheduler")
sigs := make(chan os.Signal, 1) signal.Notify(s.sigs, os.Interrupt, unix.SIGTERM, unix.SIGINT)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) <-s.sigs
<-sigs }
// SignalShutdown stops and shuts down the scheduler.
func (s *Scheduler) SignalShutdown() {
s.sigs <- os.Interrupt
} }

View File

@@ -1,3 +1,4 @@
//go:build windows
// +build windows // +build windows
package asynq package asynq
@@ -16,14 +17,22 @@ import (
// Note: Currently SIGTSTP is not supported for windows build. // Note: Currently SIGTSTP is not supported for windows build.
func (srv *Server) waitForSignals() { func (srv *Server) waitForSignals() {
srv.logger.Info("Send signal TERM or INT to terminate the process") srv.logger.Info("Send signal TERM or INT to terminate the process")
sigs := make(chan os.Signal, 1) signal.Notify(srv.sigs, os.Interrupt, windows.SIGTERM, windows.SIGINT)
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) <-srv.sigs
<-sigs }
// SignalShutdown stops and shuts down the server.
func (srv *Server) SignalShutdown() {
srv.sigs <- os.Interrupt
} }
func (s *Scheduler) waitForSignals() { func (s *Scheduler) waitForSignals() {
s.logger.Info("Send signal TERM or INT to stop the scheduler") s.logger.Info("Send signal TERM or INT to stop the scheduler")
sigs := make(chan os.Signal, 1) signal.Notify(s.sigs, os.Interrupt, windows.SIGTERM, windows.SIGINT)
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) <-s.sigs
<-sigs }
// SignalShutdown stops and shuts down the scheduler.
func (s *Scheduler) SignalShutdown() {
s.sigs <- os.Interrupt
} }