2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update doc comments

This commit is contained in:
Ken Hibino 2020-04-13 08:14:55 -07:00
parent 24da281aa7
commit 239ef27a6e
3 changed files with 29 additions and 13 deletions

View File

@ -197,7 +197,9 @@ func main() {
mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask) mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask)
// ...register other handlers... // ...register other handlers...
srv.Run(mux) if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
} }
``` ```

View File

@ -772,7 +772,7 @@ end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res`) return res`)
// ListServers returns the list of process statuses. // ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) { func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
res, err := listServersCmd.Run(r.client, res, err := listServersCmd.Run(r.client,
[]string{base.AllServers}, time.Now().UTC().Unix()).Result() []string{base.AllServers}, time.Now().UTC().Unix()).Result()

View File

@ -21,12 +21,13 @@ import (
// Server is responsible for managing the background-task processing. // Server is responsible for managing the background-task processing.
// //
// Server pulls tasks off queues and process them. // Server pulls tasks off queues and processes them.
// If the processing of a task is unsuccessful, server will // If the processing of a task is unsuccessful, server will
// schedule it for a retry until either the task gets processed successfully // schedule it for a retry.
// or it exhausts its max retry count. // A task will be retried until either the task gets processed successfully
// or until it reaches its max retry count.
// //
// Once a task exhausts its retries, it will be moved to the "dead" queue and // If a task exhausts its retries, it will be moved to the "dead" queue and
// will be kept in the queue for some time until a certain condition is met // will be kept in the queue for some time until a certain condition is met
// (e.g., queue size reaches a certain limit, or the task has been in the // (e.g., queue size reaches a certain limit, or the task has been in the
// queue for a certain amount of time). // queue for a certain amount of time).
@ -233,8 +234,11 @@ var ErrServerStopped = errors.New("asynq: the server has been stopped")
// Run starts the background-task processing and blocks until // Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives // an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all active workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
//
// Run returns any error encountered during server boot time.
// If the server has already been stopped, ErrServerStopped is returned.
func (srv *Server) Run(handler Handler) error { func (srv *Server) Run(handler Handler) error {
if err := srv.Start(handler); err != nil { if err := srv.Start(handler); err != nil {
return err return err
@ -244,15 +248,22 @@ func (srv *Server) Run(handler Handler) error {
return nil return nil
} }
// Starts the background-task processing. // Start starts the worker server. Once the server has started,
// TODO: doc // it pulls tasks off queues and starts a worker goroutine for each task.
// Tasks are processed concurrently by the workers up to the number of
// concurrency specified at the initialization time.
//
// Start returns any error encountered during server boot time.
// If the server has already been stopped, ErrServerStopped is returned.
func (srv *Server) Start(handler Handler) error { func (srv *Server) Start(handler Handler) error {
// TODO: Retrun error if handler is nil
switch srv.ss.Status() { switch srv.ss.Status() {
case base.StatusRunning: case base.StatusRunning:
return fmt.Errorf("asynq: the server is already running") return fmt.Errorf("asynq: the server is already running")
case base.StatusStopped: case base.StatusStopped:
return ErrServerStopped return ErrServerStopped
} }
// TODO: Return error if cannot connect to Redis
srv.ss.SetStatus(base.StatusRunning) srv.ss.SetStatus(base.StatusRunning)
srv.processor.handler = handler srv.processor.handler = handler
@ -273,8 +284,11 @@ func (srv *Server) Start(handler Handler) error {
return nil return nil
} }
// Stops the background-task processing. // Stop stops the worker server.
// TODO: do we need to return error? // It gracefully closes all active workers. The server will wait for
// active workers to finish processing task for 8 seconds(TODO: Add ShutdownTimeout to Config).
// If worker didn't finish processing a task during the timeout, the
// task will be pushed back to Redis.
func (srv *Server) Stop() { func (srv *Server) Stop() {
switch srv.ss.Status() { switch srv.ss.Status() {
case base.StatusIdle, base.StatusStopped: case base.StatusIdle, base.StatusStopped:
@ -301,8 +315,8 @@ func (srv *Server) Stop() {
srv.logger.Info("Bye!") srv.logger.Info("Bye!")
} }
// Quiet signals server to stop processing new tasks. // Quiet signals the server to stop pulling new tasks off queues.
// TODO: doc // Quiet should be used before stopping the server.
func (srv *Server) Quiet() { func (srv *Server) Quiet() {
srv.processor.stop() srv.processor.stop()
srv.ss.SetStatus(base.StatusQuiet) srv.ss.SetStatus(base.StatusQuiet)