diff --git a/example_test.go b/example_test.go index 1620d3f..9932679 100644 --- a/example_test.go +++ b/example_test.go @@ -70,13 +70,13 @@ func ExampleServer_Quiet() { for { s := <-sigs if s == unix.SIGTSTP { - srv.Quiet() // stop processing new tasks + srv.Stop() // stop processing new tasks continue } break } - srv.Stop() + srv.Shutdown() } func ExampleScheduler() { diff --git a/heartbeat_test.go b/heartbeat_test.go index c517519..058ae1c 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) { hb.host = tc.host hb.pid = tc.pid - status.Set(base.StatusRunning) + status.Set(base.StatusActive) var wg sync.WaitGroup hb.start(&wg) @@ -91,7 +91,7 @@ func TestHeartbeater(t *testing.T) { } // status change - status.Set(base.StatusStopped) + status.Set(base.StatusClosed) // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) @@ -138,7 +138,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { concurrency: 10, queues: map[string]int{"default": 1}, strictPriority: false, - status: base.NewServerStatus(base.StatusRunning), + status: base.NewServerStatus(base.StatusActive), starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) diff --git a/internal/base/base.go b/internal/base/base.go index 9ea18e4..4daed77 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -233,27 +233,27 @@ const ( // StatusIdle indicates the server is in idle state. StatusIdle ServerStatusValue = iota - // StatusRunning indicates the server is up and active. - StatusRunning + // StatusActive indicates the server is up and active. + StatusActive - // StatusQuiet indicates the server is up but not active. - StatusQuiet - - // StatusStopped indicates the server server has been stopped. + // StatusStopped indicates the server is up but no longer processing new tasks. StatusStopped + + // StatusClosed indicates the server has been shutdown. + StatusClosed ) var statuses = []string{ "idle", - "running", - "quiet", + "active", "stopped", + "closed", } func (s *ServerStatus) String() string { s.mu.Lock() defer s.mu.Unlock() - if StatusIdle <= s.val && s.val <= StatusStopped { + if StatusIdle <= s.val && s.val <= StatusClosed { return statuses[s.val] } return "unknown status" diff --git a/internal/base/base_test.go b/internal/base/base_test.go index cd8baea..35b9c6d 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - status.Set(StatusStopped) + status.Set(StatusClosed) _ = status.String() }() diff --git a/scheduler.go b/scheduler.go index 21280d6..c3f912c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -183,9 +183,9 @@ func (s *Scheduler) Run() error { // It returns an error if the scheduler is already running or has been stopped. func (s *Scheduler) Start() error { switch s.status.Get() { - case base.StatusRunning: + case base.StatusActive: return fmt.Errorf("asynq: the scheduler is already running") - case base.StatusStopped: + case base.StatusClosed: return fmt.Errorf("asynq: the scheduler has already been stopped") } s.logger.Info("Scheduler starting") @@ -193,14 +193,14 @@ func (s *Scheduler) Start() error { s.cron.Start() s.wg.Add(1) go s.runHeartbeater() - s.status.Set(base.StatusRunning) + s.status.Set(base.StatusActive) return nil } // Stop stops the scheduler. // It returns an error if the scheduler is not currently running. func (s *Scheduler) Stop() error { - if s.status.Get() != base.StatusRunning { + if s.status.Get() != base.StatusActive { return fmt.Errorf("asynq: the scheduler is not running") } s.logger.Info("Scheduler shutting down") @@ -212,7 +212,7 @@ func (s *Scheduler) Stop() error { s.clearHistory() s.client.Close() s.rdb.Close() - s.status.Set(base.StatusStopped) + s.status.Set(base.StatusClosed) s.logger.Info("Scheduler stopped") return nil } diff --git a/server.go b/server.go index 68ff381..535636a 100644 --- a/server.go +++ b/server.go @@ -420,8 +420,8 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { return fn(ctx, task) } -// ErrServerStopped indicates that the operation is now illegal because of the server being stopped. -var ErrServerStopped = errors.New("asynq: the server has been stopped") +// ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown. +var ErrServerClosed = errors.New("asynq: server closed") // Run starts the background-task processing and blocks until // an os signal to exit the program is received. Once it receives @@ -429,7 +429,7 @@ var ErrServerStopped = errors.New("asynq: the server has been stopped") // goroutines to process the tasks. // // Run returns any error encountered during server startup time. -// If the server has already been stopped, ErrServerStopped is returned. +// If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Run(handler Handler) error { if err := srv.Start(handler); err != nil { return err @@ -445,18 +445,18 @@ func (srv *Server) Run(handler Handler) error { // concurrency specified at the initialization time. // // Start returns any error encountered during server startup time. -// If the server has already been stopped, ErrServerStopped is returned. +// If the server has already been stopped, ErrServerClosed is returned. func (srv *Server) Start(handler Handler) error { if handler == nil { return fmt.Errorf("asynq: server cannot run with nil handler") } switch srv.status.Get() { - case base.StatusRunning: + case base.StatusActive: return fmt.Errorf("asynq: the server is already running") - case base.StatusStopped: - return ErrServerStopped + case base.StatusClosed: + return ErrServerClosed } - srv.status.Set(base.StatusRunning) + srv.status.Set(base.StatusActive) srv.processor.handler = handler srv.logger.Info("Starting processing") @@ -471,13 +471,13 @@ func (srv *Server) Start(handler Handler) error { return nil } -// Stop stops the worker server. +// Shutdown gracefully shuts down the server. // It gracefully closes all active workers. The server will wait for // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. -func (srv *Server) Stop() { +func (srv *Server) Shutdown() { switch srv.status.Get() { - case base.StatusIdle, base.StatusStopped: + case base.StatusIdle, base.StatusClosed: // server is not running, do nothing and return. return } @@ -498,16 +498,16 @@ func (srv *Server) Stop() { srv.wg.Wait() srv.broker.Close() - srv.status.Set(base.StatusStopped) + srv.status.Set(base.StatusClosed) srv.logger.Info("Exiting") } -// Quiet signals the server to stop pulling new tasks off queues. -// Quiet should be used before stopping the server. -func (srv *Server) Quiet() { +// Stop signals the server to stop pulling new tasks off queues. +// Stop should be used before shutting down the server. +func (srv *Server) Stop() { srv.logger.Info("Stopping processor") srv.processor.stop() - srv.status.Set(base.StatusQuiet) + srv.status.Set(base.StatusStopped) srv.logger.Info("Processor stopped") } diff --git a/server_test.go b/server_test.go index 6d29dd6..2d4265d 100644 --- a/server_test.go +++ b/server_test.go @@ -50,7 +50,7 @@ func TestServer(t *testing.T) { t.Errorf("could not enqueue a task: %v", err) } - srv.Stop() + srv.Shutdown() } func TestServerRun(t *testing.T) { @@ -82,16 +82,16 @@ func TestServerRun(t *testing.T) { } } -func TestServerErrServerStopped(t *testing.T) { +func TestServerErrServerClosed(t *testing.T) { srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) handler := NewServeMux() if err := srv.Start(handler); err != nil { t.Fatal(err) } - srv.Stop() + srv.Shutdown() err := srv.Start(handler) - if err != ErrServerStopped { - t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerStopped error", err) + if err != ErrServerClosed { + t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerClosed error", err) } } @@ -100,7 +100,7 @@ func TestServerErrNilHandler(t *testing.T) { err := srv.Start(nil) if err == nil { t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error") - srv.Stop() + srv.Shutdown() } } @@ -114,7 +114,7 @@ func TestServerErrServerRunning(t *testing.T) { if err == nil { t.Error("Calling (*Server).Start(handler) on already running server did not return error") } - srv.Stop() + srv.Shutdown() } func TestServerWithRedisDown(t *testing.T) { @@ -146,7 +146,7 @@ func TestServerWithRedisDown(t *testing.T) { time.Sleep(3 * time.Second) - srv.Stop() + srv.Shutdown() } func TestServerWithFlakyBroker(t *testing.T) { @@ -207,7 +207,7 @@ func TestServerWithFlakyBroker(t *testing.T) { time.Sleep(3 * time.Second) - srv.Stop() + srv.Shutdown() } func TestLogLevel(t *testing.T) { diff --git a/signals_unix.go b/signals_unix.go index 41f878b..e2c767b 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -22,7 +22,7 @@ func (srv *Server) waitForSignals() { for { sig := <-sigs if sig == unix.SIGTSTP { - srv.Quiet() + srv.Stop() continue } break