From 105084d6fb079e3a133e17af49850aa287558e3d Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 21 Mar 2021 06:33:01 -0700 Subject: [PATCH] Rename ServerStatus to ServerState internally --- heartbeat.go | 10 ++++---- heartbeat_test.go | 16 ++++++------ internal/base/base.go | 49 +++++++++++++++++++----------------- internal/base/base_test.go | 6 ++--- internal/rdb/inspect_test.go | 2 +- internal/rdb/rdb_test.go | 8 +++--- scheduler.go | 16 ++++++------ server.go | 42 ++++++++++++++++--------------- tools/asynq/cmd/server.go | 6 ++--- 9 files changed, 81 insertions(+), 74 deletions(-) diff --git a/heartbeat.go b/heartbeat.go index 348d834..8117190 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -40,8 +40,8 @@ type heartbeater struct { started time.Time workers map[string]*workerInfo - // status is shared with other goroutine but is concurrency safe. - status *base.ServerStatus + // state is shared with other goroutine but is concurrency safe. + state *base.ServerState // channels to receive updates on active workers. starting <-chan *workerInfo @@ -55,7 +55,7 @@ type heartbeaterParams struct { concurrency int queues map[string]int strictPriority bool - status *base.ServerStatus + state *base.ServerState starting <-chan *workerInfo finished <-chan *base.TaskMessage } @@ -79,7 +79,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater { queues: params.queues, strictPriority: params.strictPriority, - status: params.status, + state: params.state, workers: make(map[string]*workerInfo), starting: params.starting, finished: params.finished, @@ -142,7 +142,7 @@ func (h *heartbeater) beat() { Concurrency: h.concurrency, Queues: h.queues, StrictPriority: h.strictPriority, - Status: h.status.String(), + Status: h.state.String(), Started: h.started, ActiveWorkerCount: len(h.workers), } diff --git a/heartbeat_test.go b/heartbeat_test.go index 058ae1c..609d38e 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -38,7 +38,7 @@ func TestHeartbeater(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - status := base.NewServerStatus(base.StatusIdle) + state := base.NewServerState() hb := newHeartbeater(heartbeaterParams{ logger: testLogger, broker: rdbClient, @@ -46,7 +46,7 @@ func TestHeartbeater(t *testing.T) { concurrency: tc.concurrency, queues: tc.queues, strictPriority: false, - status: status, + state: state, starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) @@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) { hb.host = tc.host hb.pid = tc.pid - status.Set(base.StatusActive) + state.Set(base.StateActive) var wg sync.WaitGroup hb.start(&wg) @@ -65,7 +65,7 @@ func TestHeartbeater(t *testing.T) { Queues: tc.queues, Concurrency: tc.concurrency, Started: time.Now(), - Status: "running", + Status: "active", } // allow for heartbeater to write to redis @@ -91,12 +91,12 @@ func TestHeartbeater(t *testing.T) { } // status change - status.Set(base.StatusClosed) + state.Set(base.StateClosed) // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) - want.Status = "stopped" + want.Status = "closed" ss, err = rdbClient.ListServers() if err != nil { t.Errorf("could not read process status from redis: %v", err) @@ -131,6 +131,8 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { r := rdb.NewRDB(setup(t)) defer r.Close() testBroker := testbroker.NewTestBroker(r) + state := base.NewServerState() + state.Set(base.StateActive) hb := newHeartbeater(heartbeaterParams{ logger: testLogger, broker: testBroker, @@ -138,7 +140,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { concurrency: 10, queues: map[string]int{"default": 1}, strictPriority: false, - status: base.NewServerStatus(base.StatusActive), + state: state, starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) diff --git a/internal/base/base.go b/internal/base/base.go index 4daed77..554a981 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -215,52 +215,55 @@ type Z struct { Score int64 } -// ServerStatus represents status of a server. -// ServerStatus methods are concurrency safe. -type ServerStatus struct { +// ServerState represents state of a server. +// ServerState methods are concurrency safe. +type ServerState struct { mu sync.Mutex - val ServerStatusValue + val ServerStateValue } -// NewServerStatus returns a new status instance given an initial value. -func NewServerStatus(v ServerStatusValue) *ServerStatus { - return &ServerStatus{val: v} +// NewServerState returns a new state instance. +// Initial state is set to StateNew. +func NewServerState() *ServerState { + return &ServerState{val: StateNew} } -type ServerStatusValue int +type ServerStateValue int const ( - // StatusIdle indicates the server is in idle state. - StatusIdle ServerStatusValue = iota + // StateNew represents a new server. Server begins in + // this state and then transition to StatusActive when + // Start or Run is callled. + StateNew ServerStateValue = iota - // StatusActive indicates the server is up and active. - StatusActive + // StateActive indicates the server is up and active. + StateActive - // StatusStopped indicates the server is up but no longer processing new tasks. - StatusStopped + // StateStopped indicates the server is up but no longer processing new tasks. + StateStopped - // StatusClosed indicates the server has been shutdown. - StatusClosed + // StateClosed indicates the server has been shutdown. + StateClosed ) -var statuses = []string{ - "idle", +var serverStates = []string{ + "new", "active", "stopped", "closed", } -func (s *ServerStatus) String() string { +func (s *ServerState) String() string { s.mu.Lock() defer s.mu.Unlock() - if StatusIdle <= s.val && s.val <= StatusClosed { - return statuses[s.val] + if StateNew <= s.val && s.val <= StateClosed { + return serverStates[s.val] } return "unknown status" } // Get returns the status value. -func (s *ServerStatus) Get() ServerStatusValue { +func (s *ServerState) Get() ServerStateValue { s.mu.Lock() v := s.val s.mu.Unlock() @@ -268,7 +271,7 @@ func (s *ServerStatus) Get() ServerStatusValue { } // Set sets the status value. -func (s *ServerStatus) Set(v ServerStatusValue) { +func (s *ServerState) Set(v ServerStateValue) { s.mu.Lock() s.val = v s.mu.Unlock() diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 35b9c6d..ebd15ce 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -400,7 +400,7 @@ func TestServerInfoEncoding(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 1, "critical": 2}, StrictPriority: false, - Status: "running", + Status: "active", Started: time.Now().Add(-3 * time.Hour), ActiveWorkerCount: 8, }, @@ -530,7 +530,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) { // Test for status being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestStatusConcurrentAccess(t *testing.T) { - status := NewServerStatus(StatusIdle) + status := NewServerState() var wg sync.WaitGroup @@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - status.Set(StatusClosed) + status.Set(StateClosed) _ = status.String() }() diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 42cddfb..b9711fd 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3222,7 +3222,7 @@ func TestListServers(t *testing.T) { ServerID: "server123", Concurrency: 10, Queues: map[string]int{"default": 1}, - Status: "running", + Status: "active", Started: started1, ActiveWorkerCount: 0, } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index b936360..f6a41e6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1475,7 +1475,7 @@ func TestWriteServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().UTC(), - Status: "running", + Status: "active", ActiveWorkerCount: 0, } @@ -1565,7 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute).UTC(), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers), } @@ -1667,7 +1667,7 @@ func TestClearServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers1), } @@ -1690,7 +1690,7 @@ func TestClearServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-15 * time.Minute), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers2), } diff --git a/scheduler.go b/scheduler.go index c3f912c..41a77e6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -21,7 +21,7 @@ import ( // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. type Scheduler struct { id string - status *base.ServerStatus + state *base.ServerState logger *log.Logger client *Client rdb *rdb.RDB @@ -61,7 +61,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { return &Scheduler{ id: generateSchedulerID(), - status: base.NewServerStatus(base.StatusIdle), + state: base.NewServerState(), logger: logger, client: NewClient(r), rdb: rdb.NewRDB(c), @@ -182,10 +182,10 @@ func (s *Scheduler) Run() error { // Start starts the scheduler. // It returns an error if the scheduler is already running or has been stopped. func (s *Scheduler) Start() error { - switch s.status.Get() { - case base.StatusActive: + switch s.state.Get() { + case base.StateActive: return fmt.Errorf("asynq: the scheduler is already running") - case base.StatusClosed: + case base.StateClosed: return fmt.Errorf("asynq: the scheduler has already been stopped") } s.logger.Info("Scheduler starting") @@ -193,14 +193,14 @@ func (s *Scheduler) Start() error { s.cron.Start() s.wg.Add(1) go s.runHeartbeater() - s.status.Set(base.StatusActive) + s.state.Set(base.StateActive) return nil } // Stop stops the scheduler. // It returns an error if the scheduler is not currently running. func (s *Scheduler) Stop() error { - if s.status.Get() != base.StatusActive { + if s.state.Get() != base.StateActive { return fmt.Errorf("asynq: the scheduler is not running") } s.logger.Info("Scheduler shutting down") @@ -212,7 +212,7 @@ func (s *Scheduler) Stop() error { s.clearHistory() s.client.Close() s.rdb.Close() - s.status.Set(base.StatusClosed) + s.state.Set(base.StateClosed) s.logger.Info("Scheduler stopped") return nil } diff --git a/server.go b/server.go index 535636a..4ebce90 100644 --- a/server.go +++ b/server.go @@ -37,7 +37,7 @@ type Server struct { broker base.Broker - status *base.ServerStatus + state *base.ServerState // wait group to wait for all goroutines to finish. wg sync.WaitGroup @@ -278,7 +278,7 @@ const ( ) // NewServer returns a new Server given a redis connection option -// and background processing configuration. +// and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { @@ -324,7 +324,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) - status := base.NewServerStatus(base.StatusIdle) + state := base.NewServerState() cancels := base.NewCancelations() syncer := newSyncer(syncerParams{ @@ -339,7 +339,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { concurrency: n, queues: queues, strictPriority: cfg.StrictPriority, - status: status, + state: state, starting: starting, finished: finished, }) @@ -384,7 +384,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { return &Server{ logger: logger, broker: rdb, - status: status, + state: state, forwarder: forwarder, processor: processor, syncer: syncer, @@ -401,10 +401,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { // is successful. // // If ProcessTask return a non-nil error or panics, the task -// will be retried after delay. +// will be retried after delay if retry-count is remaining, +// otherwise the task will be archived. +// // One exception to this rule is when ProcessTask returns SkipRetry error. // If the returned error is SkipRetry or the error wraps SkipRetry, retry is -// skipped and task will be archived instead. +// skipped and task will be immediately archived instead. type Handler interface { ProcessTask(context.Context, *Task) error } @@ -421,21 +423,21 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { } // ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown. -var ErrServerClosed = errors.New("asynq: server closed") +var ErrServerClosed = errors.New("asynq: Server closed") -// Run starts the background-task processing and blocks until +// Run starts the task processing and blocks until // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all active workers and other // goroutines to process the tasks. // -// Run returns any error encountered during server startup time. +// Run returns any error encountered at server startup time. // If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Run(handler Handler) error { if err := srv.Start(handler); err != nil { return err } srv.waitForSignals() - srv.Stop() + srv.Shutdown() return nil } @@ -445,18 +447,18 @@ func (srv *Server) Run(handler Handler) error { // concurrency specified at the initialization time. // // Start returns any error encountered during server startup time. -// If the server has already been stopped, ErrServerClosed is returned. +// If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Start(handler Handler) error { if handler == nil { return fmt.Errorf("asynq: server cannot run with nil handler") } - switch srv.status.Get() { - case base.StatusActive: + switch srv.state.Get() { + case base.StateActive: return fmt.Errorf("asynq: the server is already running") - case base.StatusClosed: + case base.StateClosed: return ErrServerClosed } - srv.status.Set(base.StatusActive) + srv.state.Set(base.StateActive) srv.processor.handler = handler srv.logger.Info("Starting processing") @@ -476,8 +478,8 @@ func (srv *Server) Start(handler Handler) error { // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. func (srv *Server) Shutdown() { - switch srv.status.Get() { - case base.StatusIdle, base.StatusClosed: + switch srv.state.Get() { + case base.StateNew, base.StateClosed: // server is not running, do nothing and return. return } @@ -498,7 +500,7 @@ func (srv *Server) Shutdown() { srv.wg.Wait() srv.broker.Close() - srv.status.Set(base.StatusClosed) + srv.state.Set(base.StateClosed) srv.logger.Info("Exiting") } @@ -508,6 +510,6 @@ func (srv *Server) Shutdown() { func (srv *Server) Stop() { srv.logger.Info("Stopping processor") srv.processor.stop() - srv.status.Set(base.StatusStopped) + srv.state.Set(base.StateStopped) srv.logger.Info("Processor stopped") } diff --git a/tools/asynq/cmd/server.go b/tools/asynq/cmd/server.go index 4958a4e..90e899f 100644 --- a/tools/asynq/cmd/server.go +++ b/tools/asynq/cmd/server.go @@ -35,11 +35,11 @@ The command shows the following for each server: * Host and PID of the process in which the server is running * Number of active workers out of worker pool * Queue configuration -* State of the worker server ("running" | "quiet") +* State of the worker server ("active" | "stopped") * Time the server was started -A "running" server is pulling tasks from queues and processing them. -A "quiet" server is no longer pulling new tasks from queues`, +A "active" server is pulling tasks from queues and processing them. +A "stopped" server is no longer pulling new tasks from queues`, Run: serverList, }