diff --git a/benchmark_test.go b/benchmark_test.go index b6da0b2..8f4913c 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) { } b.StartTimer() // end setup - srv.start(HandlerFunc(handler)) + srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - srv.stop() + srv.Stop() b.StartTimer() // end teardown } } @@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) { } b.StartTimer() // end setup - srv.start(HandlerFunc(handler)) + srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - srv.stop() + srv.Stop() b.StartTimer() // end teardown } } @@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { } b.StartTimer() // end setup - srv.start(HandlerFunc(handler)) + srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - srv.stop() + srv.Stop() b.StartTimer() // end teardown } } diff --git a/server.go b/server.go index 50cbc8f..0102eec 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ package asynq import ( "context" + "errors" "fmt" "math" "math/rand" @@ -30,8 +31,8 @@ import ( // (e.g., queue size reaches a certain limit, or the task has been in the // queue for a certain amount of time). type Server struct { - mu sync.Mutex - running bool + mu sync.Mutex + state serverState ps *base.ProcessState @@ -197,6 +198,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) subscriber := newSubscriber(logger, rdb, cancels) return &Server{ + state: stateIdle, logger: logger, rdb: rdb, ps: ps, @@ -230,11 +232,44 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { return fn(ctx, task) } +// ErrServerStopped indicates that the operation is now illegal because of the server being stopped. +var ErrServerStopped = errors.New("asynq: the server has been stopped") + +type serverState int + +const ( + stateIdle serverState = iota + stateRunning + stateStopped +) + // Run starts the background-task processing and blocks until // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. -func (srv *Server) Run(handler Handler) { +func (srv *Server) Run(handler Handler) error { + if err := srv.Start(handler); err != nil { + return err + } + srv.waitForSignals() + srv.Stop() + return nil +} + +// Starts the background-task processing. +// TODO: doc +func (srv *Server) Start(handler Handler) error { + srv.mu.Lock() + defer srv.mu.Unlock() + switch srv.state { + case stateRunning: + return fmt.Errorf("asynq: the server is already running") + case stateStopped: + return ErrServerStopped + } + srv.state = stateRunning + srv.processor.handler = handler + type prefixLogger interface { SetPrefix(prefix string) } @@ -244,40 +279,26 @@ func (srv *Server) Run(handler Handler) { } srv.logger.Info("Starting processing") - srv.start(handler) - defer srv.stop() - - srv.waitForSignals() - fmt.Println() - srv.logger.Info("Starting graceful shutdown") -} - -// starts the background-task processing. -func (srv *Server) start(handler Handler) { - srv.mu.Lock() - defer srv.mu.Unlock() - if srv.running { - return - } - - srv.running = true - srv.processor.handler = handler - srv.heartbeater.start(&srv.wg) srv.subscriber.start(&srv.wg) srv.syncer.start(&srv.wg) srv.scheduler.start(&srv.wg) srv.processor.start(&srv.wg) + return nil } -// stops the background-task processing. -func (srv *Server) stop() { +// Stops the background-task processing. +// TODO: do we need to return error? +func (srv *Server) Stop() { srv.mu.Lock() defer srv.mu.Unlock() - if !srv.running { + if srv.state != stateRunning { + // server is not running, do nothing and return. return } + fmt.Println() // print newline for prettier log. + srv.logger.Info("Starting graceful shutdown") // Note: The order of termination is important. // Sender goroutines should be terminated before the receiver goroutines. // @@ -291,7 +312,14 @@ func (srv *Server) stop() { srv.wg.Wait() srv.rdb.Close() - srv.running = false + srv.state = stateStopped srv.logger.Info("Bye!") } + +// Quiet signals server to stop processing new tasks. +// TODO: doc +func (srv *Server) Quiet() { + srv.processor.stop() + srv.ps.SetStatus(base.StatusStopped) // TODO: rephrase this state, like StatusSilent? +} diff --git a/server_test.go b/server_test.go index ce3ff0c..9c7188e 100644 --- a/server_test.go +++ b/server_test.go @@ -32,9 +32,12 @@ func TestServer(t *testing.T) { return nil } - srv.start(HandlerFunc(h)) + err := srv.Start(HandlerFunc(h)) + if err != nil { + t.Fatal(err) + } - err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -44,7 +47,7 @@ func TestServer(t *testing.T) { t.Errorf("could not enqueue a task: %v", err) } - srv.stop() + srv.Stop() } func TestGCD(t *testing.T) { diff --git a/signals_unix.go b/signals_unix.go index 66cfaba..0a2d93b 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -7,8 +7,6 @@ import ( "os/signal" "golang.org/x/sys/unix" - - "github.com/hibiken/asynq/internal/base" ) // waitForSignals waits for signals and handles them. @@ -24,8 +22,7 @@ func (srv *Server) waitForSignals() { for { sig := <-sigs if sig == unix.SIGTSTP { - srv.processor.stop() - srv.ps.SetStatus(base.StatusStopped) + srv.Quiet() continue } break