From 239ef27a6e063468848ddbc8af18656b9636507b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 13 Apr 2020 08:14:55 -0700 Subject: [PATCH] Update doc comments --- README.md | 4 +++- internal/rdb/inspect.go | 2 +- server.go | 36 +++++++++++++++++++++++++----------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 6682310..0f902c2 100644 --- a/README.md +++ b/README.md @@ -197,7 +197,9 @@ func main() { mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask) // ...register other handlers... - srv.Run(mux) + if err := srv.Run(mux); err != nil { + log.Fatalf("could not run server: %v", err) + } } ``` diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index dd81cae..79c1c3e 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -772,7 +772,7 @@ end redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) return res`) -// ListServers returns the list of process statuses. +// ListServers returns the list of server info. func (r *RDB) ListServers() ([]*base.ServerInfo, error) { res, err := listServersCmd.Run(r.client, []string{base.AllServers}, time.Now().UTC().Unix()).Result() diff --git a/server.go b/server.go index 8943aca..3a44e5c 100644 --- a/server.go +++ b/server.go @@ -21,12 +21,13 @@ import ( // Server is responsible for managing the background-task processing. // -// Server pulls tasks off queues and process them. +// Server pulls tasks off queues and processes them. // If the processing of a task is unsuccessful, server will -// schedule it for a retry until either the task gets processed successfully -// or it exhausts its max retry count. +// schedule it for a retry. +// A task will be retried until either the task gets processed successfully +// or until it reaches its max retry count. // -// Once a task exhausts its retries, it will be moved to the "dead" queue and +// If a task exhausts its retries, it will be moved to the "dead" queue and // will be kept in the queue for some time until a certain condition is met // (e.g., queue size reaches a certain limit, or the task has been in the // queue for a certain amount of time). @@ -233,8 +234,11 @@ var ErrServerStopped = errors.New("asynq: the server has been stopped") // Run starts the background-task processing and blocks until // an os signal to exit the program is received. Once it receives -// a signal, it gracefully shuts down all pending workers and other +// a signal, it gracefully shuts down all active workers and other // goroutines to process the tasks. +// +// Run returns any error encountered during server boot time. +// If the server has already been stopped, ErrServerStopped is returned. func (srv *Server) Run(handler Handler) error { if err := srv.Start(handler); err != nil { return err @@ -244,15 +248,22 @@ func (srv *Server) Run(handler Handler) error { return nil } -// Starts the background-task processing. -// TODO: doc +// Start starts the worker server. Once the server has started, +// it pulls tasks off queues and starts a worker goroutine for each task. +// Tasks are processed concurrently by the workers up to the number of +// concurrency specified at the initialization time. +// +// Start returns any error encountered during server boot time. +// If the server has already been stopped, ErrServerStopped is returned. func (srv *Server) Start(handler Handler) error { + // TODO: Retrun error if handler is nil switch srv.ss.Status() { case base.StatusRunning: return fmt.Errorf("asynq: the server is already running") case base.StatusStopped: return ErrServerStopped } + // TODO: Return error if cannot connect to Redis srv.ss.SetStatus(base.StatusRunning) srv.processor.handler = handler @@ -273,8 +284,11 @@ func (srv *Server) Start(handler Handler) error { return nil } -// Stops the background-task processing. -// TODO: do we need to return error? +// Stop stops the worker server. +// It gracefully closes all active workers. The server will wait for +// active workers to finish processing task for 8 seconds(TODO: Add ShutdownTimeout to Config). +// If worker didn't finish processing a task during the timeout, the +// task will be pushed back to Redis. func (srv *Server) Stop() { switch srv.ss.Status() { case base.StatusIdle, base.StatusStopped: @@ -301,8 +315,8 @@ func (srv *Server) Stop() { srv.logger.Info("Bye!") } -// Quiet signals server to stop processing new tasks. -// TODO: doc +// Quiet signals the server to stop pulling new tasks off queues. +// Quiet should be used before stopping the server. func (srv *Server) Quiet() { srv.processor.stop() srv.ss.SetStatus(base.StatusQuiet)