2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Change Server API

This commit is contained in:
Ken Hibino 2021-03-20 17:16:44 -07:00
parent af6d9d3df7
commit 5af86a0903
8 changed files with 46 additions and 46 deletions

View File

@ -70,13 +70,13 @@ func ExampleServer_Quiet() {
for { for {
s := <-sigs s := <-sigs
if s == unix.SIGTSTP { if s == unix.SIGTSTP {
srv.Quiet() // stop processing new tasks srv.Stop() // stop processing new tasks
continue continue
} }
break break
} }
srv.Stop() srv.Shutdown()
} }
func ExampleScheduler() { func ExampleScheduler() {

View File

@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) {
hb.host = tc.host hb.host = tc.host
hb.pid = tc.pid hb.pid = tc.pid
status.Set(base.StatusRunning) status.Set(base.StatusActive)
var wg sync.WaitGroup var wg sync.WaitGroup
hb.start(&wg) hb.start(&wg)
@ -91,7 +91,7 @@ func TestHeartbeater(t *testing.T) {
} }
// status change // status change
status.Set(base.StatusStopped) status.Set(base.StatusClosed)
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)
@ -138,7 +138,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
concurrency: 10, concurrency: 10,
queues: map[string]int{"default": 1}, queues: map[string]int{"default": 1},
strictPriority: false, strictPriority: false,
status: base.NewServerStatus(base.StatusRunning), status: base.NewServerStatus(base.StatusActive),
starting: make(chan *workerInfo), starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage), finished: make(chan *base.TaskMessage),
}) })

View File

@ -233,27 +233,27 @@ const (
// StatusIdle indicates the server is in idle state. // StatusIdle indicates the server is in idle state.
StatusIdle ServerStatusValue = iota StatusIdle ServerStatusValue = iota
// StatusRunning indicates the server is up and active. // StatusActive indicates the server is up and active.
StatusRunning StatusActive
// StatusQuiet indicates the server is up but not active. // StatusStopped indicates the server is up but no longer processing new tasks.
StatusQuiet
// StatusStopped indicates the server server has been stopped.
StatusStopped StatusStopped
// StatusClosed indicates the server has been shutdown.
StatusClosed
) )
var statuses = []string{ var statuses = []string{
"idle", "idle",
"running", "active",
"quiet",
"stopped", "stopped",
"closed",
} }
func (s *ServerStatus) String() string { func (s *ServerStatus) String() string {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if StatusIdle <= s.val && s.val <= StatusStopped { if StatusIdle <= s.val && s.val <= StatusClosed {
return statuses[s.val] return statuses[s.val]
} }
return "unknown status" return "unknown status"

View File

@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
status.Set(StatusStopped) status.Set(StatusClosed)
_ = status.String() _ = status.String()
}() }()

View File

@ -183,9 +183,9 @@ func (s *Scheduler) Run() error {
// It returns an error if the scheduler is already running or has been stopped. // It returns an error if the scheduler is already running or has been stopped.
func (s *Scheduler) Start() error { func (s *Scheduler) Start() error {
switch s.status.Get() { switch s.status.Get() {
case base.StatusRunning: case base.StatusActive:
return fmt.Errorf("asynq: the scheduler is already running") return fmt.Errorf("asynq: the scheduler is already running")
case base.StatusStopped: case base.StatusClosed:
return fmt.Errorf("asynq: the scheduler has already been stopped") return fmt.Errorf("asynq: the scheduler has already been stopped")
} }
s.logger.Info("Scheduler starting") s.logger.Info("Scheduler starting")
@ -193,14 +193,14 @@ func (s *Scheduler) Start() error {
s.cron.Start() s.cron.Start()
s.wg.Add(1) s.wg.Add(1)
go s.runHeartbeater() go s.runHeartbeater()
s.status.Set(base.StatusRunning) s.status.Set(base.StatusActive)
return nil return nil
} }
// Stop stops the scheduler. // Stop stops the scheduler.
// It returns an error if the scheduler is not currently running. // It returns an error if the scheduler is not currently running.
func (s *Scheduler) Stop() error { func (s *Scheduler) Stop() error {
if s.status.Get() != base.StatusRunning { if s.status.Get() != base.StatusActive {
return fmt.Errorf("asynq: the scheduler is not running") return fmt.Errorf("asynq: the scheduler is not running")
} }
s.logger.Info("Scheduler shutting down") s.logger.Info("Scheduler shutting down")
@ -212,7 +212,7 @@ func (s *Scheduler) Stop() error {
s.clearHistory() s.clearHistory()
s.client.Close() s.client.Close()
s.rdb.Close() s.rdb.Close()
s.status.Set(base.StatusStopped) s.status.Set(base.StatusClosed)
s.logger.Info("Scheduler stopped") s.logger.Info("Scheduler stopped")
return nil return nil
} }

View File

@ -420,8 +420,8 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
return fn(ctx, task) return fn(ctx, task)
} }
// ErrServerStopped indicates that the operation is now illegal because of the server being stopped. // ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.
var ErrServerStopped = errors.New("asynq: the server has been stopped") var ErrServerClosed = errors.New("asynq: server closed")
// Run starts the background-task processing and blocks until // Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives // an os signal to exit the program is received. Once it receives
@ -429,7 +429,7 @@ var ErrServerStopped = errors.New("asynq: the server has been stopped")
// goroutines to process the tasks. // goroutines to process the tasks.
// //
// Run returns any error encountered during server startup time. // Run returns any error encountered during server startup time.
// If the server has already been stopped, ErrServerStopped is returned. // If the server has already been shutdown, ErrServerClosed is returned.
func (srv *Server) Run(handler Handler) error { func (srv *Server) Run(handler Handler) error {
if err := srv.Start(handler); err != nil { if err := srv.Start(handler); err != nil {
return err return err
@ -445,18 +445,18 @@ func (srv *Server) Run(handler Handler) error {
// concurrency specified at the initialization time. // concurrency specified at the initialization time.
// //
// Start returns any error encountered during server startup time. // Start returns any error encountered during server startup time.
// If the server has already been stopped, ErrServerStopped is returned. // If the server has already been stopped, ErrServerClosed is returned.
func (srv *Server) Start(handler Handler) error { func (srv *Server) Start(handler Handler) error {
if handler == nil { if handler == nil {
return fmt.Errorf("asynq: server cannot run with nil handler") return fmt.Errorf("asynq: server cannot run with nil handler")
} }
switch srv.status.Get() { switch srv.status.Get() {
case base.StatusRunning: case base.StatusActive:
return fmt.Errorf("asynq: the server is already running") return fmt.Errorf("asynq: the server is already running")
case base.StatusStopped: case base.StatusClosed:
return ErrServerStopped return ErrServerClosed
} }
srv.status.Set(base.StatusRunning) srv.status.Set(base.StatusActive)
srv.processor.handler = handler srv.processor.handler = handler
srv.logger.Info("Starting processing") srv.logger.Info("Starting processing")
@ -471,13 +471,13 @@ func (srv *Server) Start(handler Handler) error {
return nil return nil
} }
// Stop stops the worker server. // Shutdown gracefully shuts down the server.
// It gracefully closes all active workers. The server will wait for // It gracefully closes all active workers. The server will wait for
// active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.
// If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
func (srv *Server) Stop() { func (srv *Server) Shutdown() {
switch srv.status.Get() { switch srv.status.Get() {
case base.StatusIdle, base.StatusStopped: case base.StatusIdle, base.StatusClosed:
// server is not running, do nothing and return. // server is not running, do nothing and return.
return return
} }
@ -498,16 +498,16 @@ func (srv *Server) Stop() {
srv.wg.Wait() srv.wg.Wait()
srv.broker.Close() srv.broker.Close()
srv.status.Set(base.StatusStopped) srv.status.Set(base.StatusClosed)
srv.logger.Info("Exiting") srv.logger.Info("Exiting")
} }
// Quiet signals the server to stop pulling new tasks off queues. // Stop signals the server to stop pulling new tasks off queues.
// Quiet should be used before stopping the server. // Stop should be used before shutting down the server.
func (srv *Server) Quiet() { func (srv *Server) Stop() {
srv.logger.Info("Stopping processor") srv.logger.Info("Stopping processor")
srv.processor.stop() srv.processor.stop()
srv.status.Set(base.StatusQuiet) srv.status.Set(base.StatusStopped)
srv.logger.Info("Processor stopped") srv.logger.Info("Processor stopped")
} }

View File

@ -50,7 +50,7 @@ func TestServer(t *testing.T) {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
srv.Stop() srv.Shutdown()
} }
func TestServerRun(t *testing.T) { func TestServerRun(t *testing.T) {
@ -82,16 +82,16 @@ func TestServerRun(t *testing.T) {
} }
} }
func TestServerErrServerStopped(t *testing.T) { func TestServerErrServerClosed(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
handler := NewServeMux() handler := NewServeMux()
if err := srv.Start(handler); err != nil { if err := srv.Start(handler); err != nil {
t.Fatal(err) t.Fatal(err)
} }
srv.Stop() srv.Shutdown()
err := srv.Start(handler) err := srv.Start(handler)
if err != ErrServerStopped { if err != ErrServerClosed {
t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerStopped error", err) t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerClosed error", err)
} }
} }
@ -100,7 +100,7 @@ func TestServerErrNilHandler(t *testing.T) {
err := srv.Start(nil) err := srv.Start(nil)
if err == nil { if err == nil {
t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error") t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error")
srv.Stop() srv.Shutdown()
} }
} }
@ -114,7 +114,7 @@ func TestServerErrServerRunning(t *testing.T) {
if err == nil { if err == nil {
t.Error("Calling (*Server).Start(handler) on already running server did not return error") t.Error("Calling (*Server).Start(handler) on already running server did not return error")
} }
srv.Stop() srv.Shutdown()
} }
func TestServerWithRedisDown(t *testing.T) { func TestServerWithRedisDown(t *testing.T) {
@ -146,7 +146,7 @@ func TestServerWithRedisDown(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
srv.Stop() srv.Shutdown()
} }
func TestServerWithFlakyBroker(t *testing.T) { func TestServerWithFlakyBroker(t *testing.T) {
@ -207,7 +207,7 @@ func TestServerWithFlakyBroker(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
srv.Stop() srv.Shutdown()
} }
func TestLogLevel(t *testing.T) { func TestLogLevel(t *testing.T) {

View File

@ -22,7 +22,7 @@ func (srv *Server) waitForSignals() {
for { for {
sig := <-sigs sig := <-sigs
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
srv.Quiet() srv.Stop()
continue continue
} }
break break