From 9c95c416518af5252c5b8713de365c9305bfe181 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 23 Mar 2021 06:20:54 -0700 Subject: [PATCH] Change Server API * Rename ServerStatus to ServerState internally * Rename terminate to shutdown internally * Update Scheduler API to match Server API --- CHANGELOG.md | 2 + example_test.go | 12 ++-- forwarder.go | 2 +- forwarder_test.go | 2 +- healthcheck.go | 2 +- healthcheck_test.go | 4 +- heartbeat.go | 12 ++-- heartbeat_test.go | 32 ++++++----- internal/base/base.go | 53 +++++++++--------- internal/base/base_test.go | 6 +- internal/rdb/inspect_test.go | 2 +- internal/rdb/rdb_test.go | 8 +-- processor.go | 4 +- processor_test.go | 14 ++--- recoverer.go | 2 +- recoverer_test.go | 2 +- scheduler.go | 30 +++++----- scheduler_test.go | 12 +--- server.go | 103 +++++++++++++++++++---------------- server_test.go | 18 +++--- signals_unix.go | 2 +- subscriber.go | 2 +- subscriber_test.go | 4 +- syncer.go | 2 +- syncer_test.go | 6 +- tools/asynq/cmd/server.go | 6 +- 26 files changed, 175 insertions(+), 169 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d53b35f..5d3617a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - NewTask function now takes array of bytes as payload. - Task `Type` and `Payload` should be accessed by a method call. +- `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`. +- `Scheduler` API has changed. Renamed `Stop` to `Shutdown`. - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script diff --git a/example_test.go b/example_test.go index 1620d3f..c2909a2 100644 --- a/example_test.go +++ b/example_test.go @@ -30,7 +30,7 @@ func ExampleServer_Run() { } } -func ExampleServer_Stop() { +func ExampleServer_Shutdown() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, @@ -47,10 +47,10 @@ func ExampleServer_Stop() { signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) <-sigs // wait for termination signal - srv.Stop() + srv.Shutdown() } -func ExampleServer_Quiet() { +func ExampleServer_Stop() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, @@ -70,13 +70,13 @@ func ExampleServer_Quiet() { for { s := <-sigs if s == unix.SIGTSTP { - srv.Quiet() // stop processing new tasks + srv.Stop() // stop processing new tasks continue } - break + break // received SIGTERM or SIGINT signal } - srv.Stop() + srv.Shutdown() } func ExampleScheduler() { diff --git a/forwarder.go b/forwarder.go index d7bd1f5..ec1c1f2 100644 --- a/forwarder.go +++ b/forwarder.go @@ -45,7 +45,7 @@ func newForwarder(params forwarderParams) *forwarder { } } -func (f *forwarder) terminate() { +func (f *forwarder) shutdown() { f.logger.Debug("Forwarder shutting down...") // Signal the forwarder goroutine to stop polling. f.done <- struct{}{} diff --git a/forwarder_test.go b/forwarder_test.go index a197f58..da251e1 100644 --- a/forwarder_test.go +++ b/forwarder_test.go @@ -111,7 +111,7 @@ func TestForwarder(t *testing.T) { var wg sync.WaitGroup s.start(&wg) time.Sleep(tc.wait) - s.terminate() + s.shutdown() for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledMessages(t, r, qname) diff --git a/healthcheck.go b/healthcheck.go index 84526a0..83f9916 100644 --- a/healthcheck.go +++ b/healthcheck.go @@ -45,7 +45,7 @@ func newHealthChecker(params healthcheckerParams) *healthchecker { } } -func (hc *healthchecker) terminate() { +func (hc *healthchecker) shutdown() { if hc.healthcheckFunc == nil { return } diff --git a/healthcheck_test.go b/healthcheck_test.go index 4b4c15e..e61e897 100644 --- a/healthcheck_test.go +++ b/healthcheck_test.go @@ -51,7 +51,7 @@ func TestHealthChecker(t *testing.T) { } mu.Unlock() - hc.terminate() + hc.shutdown() } func TestHealthCheckerWhenRedisDown(t *testing.T) { @@ -99,5 +99,5 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) { } mu.Unlock() - hc.terminate() + hc.shutdown() } diff --git a/heartbeat.go b/heartbeat.go index 348d834..a51158f 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -40,8 +40,8 @@ type heartbeater struct { started time.Time workers map[string]*workerInfo - // status is shared with other goroutine but is concurrency safe. - status *base.ServerStatus + // state is shared with other goroutine but is concurrency safe. + state *base.ServerState // channels to receive updates on active workers. starting <-chan *workerInfo @@ -55,7 +55,7 @@ type heartbeaterParams struct { concurrency int queues map[string]int strictPriority bool - status *base.ServerStatus + state *base.ServerState starting <-chan *workerInfo finished <-chan *base.TaskMessage } @@ -79,14 +79,14 @@ func newHeartbeater(params heartbeaterParams) *heartbeater { queues: params.queues, strictPriority: params.strictPriority, - status: params.status, + state: params.state, workers: make(map[string]*workerInfo), starting: params.starting, finished: params.finished, } } -func (h *heartbeater) terminate() { +func (h *heartbeater) shutdown() { h.logger.Debug("Heartbeater shutting down...") // Signal the heartbeater goroutine to stop. h.done <- struct{}{} @@ -142,7 +142,7 @@ func (h *heartbeater) beat() { Concurrency: h.concurrency, Queues: h.queues, StrictPriority: h.strictPriority, - Status: h.status.String(), + Status: h.state.String(), Started: h.started, ActiveWorkerCount: len(h.workers), } diff --git a/heartbeat_test.go b/heartbeat_test.go index c517519..518c5b0 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -38,7 +38,7 @@ func TestHeartbeater(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - status := base.NewServerStatus(base.StatusIdle) + state := base.NewServerState() hb := newHeartbeater(heartbeaterParams{ logger: testLogger, broker: rdbClient, @@ -46,7 +46,7 @@ func TestHeartbeater(t *testing.T) { concurrency: tc.concurrency, queues: tc.queues, strictPriority: false, - status: status, + state: state, starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) @@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) { hb.host = tc.host hb.pid = tc.pid - status.Set(base.StatusRunning) + state.Set(base.StateActive) var wg sync.WaitGroup hb.start(&wg) @@ -65,7 +65,7 @@ func TestHeartbeater(t *testing.T) { Queues: tc.queues, Concurrency: tc.concurrency, Started: time.Now(), - Status: "running", + Status: "active", } // allow for heartbeater to write to redis @@ -74,49 +74,49 @@ func TestHeartbeater(t *testing.T) { ss, err := rdbClient.ListServers() if err != nil { t.Errorf("could not read server info from redis: %v", err) - hb.terminate() + hb.shutdown() continue } if len(ss) != 1 { t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) - hb.terminate() + hb.shutdown() continue } if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) - hb.terminate() + hb.shutdown() continue } // status change - status.Set(base.StatusStopped) + state.Set(base.StateClosed) // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) - want.Status = "stopped" + want.Status = "closed" ss, err = rdbClient.ListServers() if err != nil { t.Errorf("could not read process status from redis: %v", err) - hb.terminate() + hb.shutdown() continue } if len(ss) != 1 { t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) - hb.terminate() + hb.shutdown() continue } if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) - hb.terminate() + hb.shutdown() continue } - hb.terminate() + hb.shutdown() } } @@ -131,6 +131,8 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { r := rdb.NewRDB(setup(t)) defer r.Close() testBroker := testbroker.NewTestBroker(r) + state := base.NewServerState() + state.Set(base.StateActive) hb := newHeartbeater(heartbeaterParams{ logger: testLogger, broker: testBroker, @@ -138,7 +140,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { concurrency: 10, queues: map[string]int{"default": 1}, strictPriority: false, - status: base.NewServerStatus(base.StatusRunning), + state: state, starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) @@ -150,5 +152,5 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { // wait for heartbeater to try writing data to redis time.Sleep(2 * time.Second) - hb.terminate() + hb.shutdown() } diff --git a/internal/base/base.go b/internal/base/base.go index 2b040af..3a51f44 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -215,52 +215,55 @@ type Z struct { Score int64 } -// ServerStatus represents status of a server. -// ServerStatus methods are concurrency safe. -type ServerStatus struct { +// ServerState represents state of a server. +// ServerState methods are concurrency safe. +type ServerState struct { mu sync.Mutex - val ServerStatusValue + val ServerStateValue } -// NewServerStatus returns a new status instance given an initial value. -func NewServerStatus(v ServerStatusValue) *ServerStatus { - return &ServerStatus{val: v} +// NewServerState returns a new state instance. +// Initial state is set to StateNew. +func NewServerState() *ServerState { + return &ServerState{val: StateNew} } -type ServerStatusValue int +type ServerStateValue int const ( - // StatusIdle indicates the server is in idle state. - StatusIdle ServerStatusValue = iota + // StateNew represents a new server. Server begins in + // this state and then transition to StatusActive when + // Start or Run is callled. + StateNew ServerStateValue = iota - // StatusRunning indicates the server is up and active. - StatusRunning + // StateActive indicates the server is up and active. + StateActive - // StatusQuiet indicates the server is up but not active. - StatusQuiet + // StateStopped indicates the server is up but no longer processing new tasks. + StateStopped - // StatusStopped indicates the server server has been stopped. - StatusStopped + // StateClosed indicates the server has been shutdown. + StateClosed ) -var statuses = []string{ - "idle", - "running", - "quiet", +var serverStates = []string{ + "new", + "active", "stopped", + "closed", } -func (s *ServerStatus) String() string { +func (s *ServerState) String() string { s.mu.Lock() defer s.mu.Unlock() - if StatusIdle <= s.val && s.val <= StatusStopped { - return statuses[s.val] + if StateNew <= s.val && s.val <= StateClosed { + return serverStates[s.val] } return "unknown status" } // Get returns the status value. -func (s *ServerStatus) Get() ServerStatusValue { +func (s *ServerState) Get() ServerStateValue { s.mu.Lock() v := s.val s.mu.Unlock() @@ -268,7 +271,7 @@ func (s *ServerStatus) Get() ServerStatusValue { } // Set sets the status value. -func (s *ServerStatus) Set(v ServerStatusValue) { +func (s *ServerState) Set(v ServerStateValue) { s.mu.Lock() s.val = v s.mu.Unlock() diff --git a/internal/base/base_test.go b/internal/base/base_test.go index cd8baea..ebd15ce 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -400,7 +400,7 @@ func TestServerInfoEncoding(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 1, "critical": 2}, StrictPriority: false, - Status: "running", + Status: "active", Started: time.Now().Add(-3 * time.Hour), ActiveWorkerCount: 8, }, @@ -530,7 +530,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) { // Test for status being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestStatusConcurrentAccess(t *testing.T) { - status := NewServerStatus(StatusIdle) + status := NewServerState() var wg sync.WaitGroup @@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - status.Set(StatusStopped) + status.Set(StateClosed) _ = status.String() }() diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 1b50df1..75cca75 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3305,7 +3305,7 @@ func TestListServers(t *testing.T) { ServerID: "server123", Concurrency: 10, Queues: map[string]int{"default": 1}, - Status: "running", + Status: "active", Started: started1, ActiveWorkerCount: 0, } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index b936360..f6a41e6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1475,7 +1475,7 @@ func TestWriteServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().UTC(), - Status: "running", + Status: "active", ActiveWorkerCount: 0, } @@ -1565,7 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute).UTC(), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers), } @@ -1667,7 +1667,7 @@ func TestClearServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers1), } @@ -1690,7 +1690,7 @@ func TestClearServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-15 * time.Minute), - Status: "running", + Status: "active", ActiveWorkerCount: len(workers2), } diff --git a/processor.go b/processor.go index e480650..605d077 100644 --- a/processor.go +++ b/processor.go @@ -123,8 +123,8 @@ func (p *processor) stop() { }) } -// NOTE: once terminated, processor cannot be re-started. -func (p *processor) terminate() { +// NOTE: once shutdown, processor cannot be re-started. +func (p *processor) shutdown() { p.stop() time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) }) diff --git a/processor_test.go b/processor_test.go index 6fd3f25..cedb427 100644 --- a/processor_test.go +++ b/processor_test.go @@ -113,7 +113,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { - p.terminate() + p.shutdown() t.Fatal(err) } } @@ -121,7 +121,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -213,7 +213,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -290,7 +290,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -418,12 +418,12 @@ func TestProcessorRetry(t *testing.T) { for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { - p.terminate() + p.shutdown() t.Fatal(err) } } time.Sleep(tc.wait) // FIXME: This makes test flaky. - p.terminate() + p.shutdown() cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) @@ -594,7 +594,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } - p.terminate() + p.shutdown() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) diff --git a/recoverer.go b/recoverer.go index 5a6da19..9c3dae3 100644 --- a/recoverer.go +++ b/recoverer.go @@ -47,7 +47,7 @@ func newRecoverer(params recovererParams) *recoverer { } } -func (r *recoverer) terminate() { +func (r *recoverer) shutdown() { r.logger.Debug("Recoverer shutting down...") // Signal the recoverer goroutine to stop polling. r.done <- struct{}{} diff --git a/recoverer_test.go b/recoverer_test.go index ae32674..91afde4 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -239,7 +239,7 @@ func TestRecoverer(t *testing.T) { var wg sync.WaitGroup recoverer.start(&wg) time.Sleep(2 * time.Second) - recoverer.terminate() + recoverer.shutdown() for qname, want := range tc.wantActive { gotActive := h.GetActiveMessages(t, r, qname) diff --git a/scheduler.go b/scheduler.go index 21280d6..b9bc3d4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -21,7 +21,7 @@ import ( // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. type Scheduler struct { id string - status *base.ServerStatus + state *base.ServerState logger *log.Logger client *Client rdb *rdb.RDB @@ -61,7 +61,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { return &Scheduler{ id: generateSchedulerID(), - status: base.NewServerStatus(base.StatusIdle), + state: base.NewServerState(), logger: logger, client: NewClient(r), rdb: rdb.NewRDB(c), @@ -170,22 +170,23 @@ func (s *Scheduler) Unregister(entryID string) error { } // Run starts the scheduler until an os signal to exit the program is received. -// It returns an error if scheduler is already running or has been stopped. +// It returns an error if scheduler is already running or has been shutdown. func (s *Scheduler) Run() error { if err := s.Start(); err != nil { return err } s.waitForSignals() - return s.Stop() + s.Shutdown() + return nil } // Start starts the scheduler. -// It returns an error if the scheduler is already running or has been stopped. +// It returns an error if the scheduler is already running or has been shutdown. func (s *Scheduler) Start() error { - switch s.status.Get() { - case base.StatusRunning: + switch s.state.Get() { + case base.StateActive: return fmt.Errorf("asynq: the scheduler is already running") - case base.StatusStopped: + case base.StateClosed: return fmt.Errorf("asynq: the scheduler has already been stopped") } s.logger.Info("Scheduler starting") @@ -193,16 +194,12 @@ func (s *Scheduler) Start() error { s.cron.Start() s.wg.Add(1) go s.runHeartbeater() - s.status.Set(base.StatusRunning) + s.state.Set(base.StateActive) return nil } -// Stop stops the scheduler. -// It returns an error if the scheduler is not currently running. -func (s *Scheduler) Stop() error { - if s.status.Get() != base.StatusRunning { - return fmt.Errorf("asynq: the scheduler is not running") - } +// Shutdown stops and shuts down the scheduler. +func (s *Scheduler) Shutdown() { s.logger.Info("Scheduler shutting down") close(s.done) // signal heartbeater to stop ctx := s.cron.Stop() @@ -212,9 +209,8 @@ func (s *Scheduler) Stop() error { s.clearHistory() s.client.Close() s.rdb.Close() - s.status.Set(base.StatusStopped) + s.state.Set(base.StateClosed) s.logger.Info("Scheduler stopped") - return nil } func (s *Scheduler) runHeartbeater() { diff --git a/scheduler_test.go b/scheduler_test.go index 20339ca..9ee809f 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -67,9 +67,7 @@ func TestSchedulerRegister(t *testing.T) { t.Fatal(err) } time.Sleep(tc.wait) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() got := asynqtest.GetPendingMessages(t, r, tc.queue) if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" { @@ -106,9 +104,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) { } // Scheduler should attempt to enqueue the task three times (every 3s). time.Sleep(10 * time.Second) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() mu.Lock() if counter != 3 { @@ -150,9 +146,7 @@ func TestSchedulerUnregister(t *testing.T) { t.Fatal(err) } time.Sleep(tc.wait) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() got := asynqtest.GetPendingMessages(t, r, tc.queue) if len(got) != 0 { diff --git a/server.go b/server.go index 68ff381..41cda4b 100644 --- a/server.go +++ b/server.go @@ -21,23 +21,24 @@ import ( "github.com/hibiken/asynq/internal/rdb" ) -// Server is responsible for managing the task processing. +// Server is responsible for task processing and task lifecycle management. // // Server pulls tasks off queues and processes them. // If the processing of a task is unsuccessful, server will schedule it for a retry. +// // A task will be retried until either the task gets processed successfully // or until it reaches its max retry count. // // If a task exhausts its retries, it will be moved to the archive and -// will be kept in the archive for some time until a certain condition is met -// (e.g., archive size reaches a certain limit, or the task has been in the -// archive for a certain amount of time). +// will be kept in the archive set. +// Note that the archive size is finite and once it reaches its max size, +// oldest tasks in the archive will be deleted. type Server struct { logger *log.Logger broker base.Broker - status *base.ServerStatus + state *base.ServerState // wait group to wait for all goroutines to finish. wg sync.WaitGroup @@ -278,7 +279,7 @@ const ( ) // NewServer returns a new Server given a redis connection option -// and background processing configuration. +// and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { @@ -324,7 +325,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) - status := base.NewServerStatus(base.StatusIdle) + state := base.NewServerState() cancels := base.NewCancelations() syncer := newSyncer(syncerParams{ @@ -339,7 +340,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { concurrency: n, queues: queues, strictPriority: cfg.StrictPriority, - status: status, + state: state, starting: starting, finished: finished, }) @@ -384,7 +385,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { return &Server{ logger: logger, broker: rdb, - status: status, + state: state, forwarder: forwarder, processor: processor, syncer: syncer, @@ -400,11 +401,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { // ProcessTask should return nil if the processing of a task // is successful. // -// If ProcessTask return a non-nil error or panics, the task -// will be retried after delay. -// One exception to this rule is when ProcessTask returns SkipRetry error. -// If the returned error is SkipRetry or the error wraps SkipRetry, retry is -// skipped and task will be archived instead. +// If ProcessTask returns a non-nil error or panics, the task +// will be retried after delay if retry-count is remaining, +// otherwise the task will be archived. +// +// One exception to this rule is when ProcessTask returns a SkipRetry error. +// If the returned error is SkipRetry or an error wraps SkipRetry, retry is +// skipped and the task will be immediately archived instead. type Handler interface { ProcessTask(context.Context, *Task) error } @@ -420,43 +423,46 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { return fn(ctx, task) } -// ErrServerStopped indicates that the operation is now illegal because of the server being stopped. -var ErrServerStopped = errors.New("asynq: the server has been stopped") +// ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown. +var ErrServerClosed = errors.New("asynq: Server closed") -// Run starts the background-task processing and blocks until +// Run starts the task processing and blocks until // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all active workers and other // goroutines to process the tasks. // -// Run returns any error encountered during server startup time. -// If the server has already been stopped, ErrServerStopped is returned. +// Run returns any error encountered at server startup time. +// If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Run(handler Handler) error { if err := srv.Start(handler); err != nil { return err } srv.waitForSignals() - srv.Stop() + srv.Shutdown() return nil } // Start starts the worker server. Once the server has started, -// it pulls tasks off queues and starts a worker goroutine for each task. -// Tasks are processed concurrently by the workers up to the number of -// concurrency specified at the initialization time. +// it pulls tasks off queues and starts a worker goroutine for each task +// and then call Handler to process it. +// Tasks are processed concurrently by the workers up to the number of +// concurrency specified in Config.Concurrency. // -// Start returns any error encountered during server startup time. -// If the server has already been stopped, ErrServerStopped is returned. +// Start returns any error encountered at server startup time. +// If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Start(handler Handler) error { if handler == nil { return fmt.Errorf("asynq: server cannot run with nil handler") } - switch srv.status.Get() { - case base.StatusRunning: + switch srv.state.Get() { + case base.StateActive: return fmt.Errorf("asynq: the server is already running") - case base.StatusStopped: - return ErrServerStopped + case base.StateStopped: + return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.") + case base.StateClosed: + return ErrServerClosed } - srv.status.Set(base.StatusRunning) + srv.state.Set(base.StateActive) srv.processor.handler = handler srv.logger.Info("Starting processing") @@ -471,43 +477,46 @@ func (srv *Server) Start(handler Handler) error { return nil } -// Stop stops the worker server. +// Shutdown gracefully shuts down the server. // It gracefully closes all active workers. The server will wait for // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. -func (srv *Server) Stop() { - switch srv.status.Get() { - case base.StatusIdle, base.StatusStopped: +func (srv *Server) Shutdown() { + switch srv.state.Get() { + case base.StateNew, base.StateClosed: // server is not running, do nothing and return. return } srv.logger.Info("Starting graceful shutdown") - // Note: The order of termination is important. + // Note: The order of shutdown is important. // Sender goroutines should be terminated before the receiver goroutines. // processor -> syncer (via syncCh) // processor -> heartbeater (via starting, finished channels) - srv.forwarder.terminate() - srv.processor.terminate() - srv.recoverer.terminate() - srv.syncer.terminate() - srv.subscriber.terminate() - srv.healthchecker.terminate() - srv.heartbeater.terminate() + srv.forwarder.shutdown() + srv.processor.shutdown() + srv.recoverer.shutdown() + srv.syncer.shutdown() + srv.subscriber.shutdown() + srv.healthchecker.shutdown() + srv.heartbeater.shutdown() srv.wg.Wait() srv.broker.Close() - srv.status.Set(base.StatusStopped) + srv.state.Set(base.StateClosed) srv.logger.Info("Exiting") } -// Quiet signals the server to stop pulling new tasks off queues. -// Quiet should be used before stopping the server. -func (srv *Server) Quiet() { +// Stop signals the server to stop pulling new tasks off queues. +// Stop can be used before shutting down the server to ensure that all +// currently active tasks are processed before server shutdown. +// +// Stop does not shutdown the server, make sure to call Shutdown before exit. +func (srv *Server) Stop() { srv.logger.Info("Stopping processor") srv.processor.stop() - srv.status.Set(base.StatusQuiet) + srv.state.Set(base.StateStopped) srv.logger.Info("Processor stopped") } diff --git a/server_test.go b/server_test.go index 6d29dd6..2d4265d 100644 --- a/server_test.go +++ b/server_test.go @@ -50,7 +50,7 @@ func TestServer(t *testing.T) { t.Errorf("could not enqueue a task: %v", err) } - srv.Stop() + srv.Shutdown() } func TestServerRun(t *testing.T) { @@ -82,16 +82,16 @@ func TestServerRun(t *testing.T) { } } -func TestServerErrServerStopped(t *testing.T) { +func TestServerErrServerClosed(t *testing.T) { srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) handler := NewServeMux() if err := srv.Start(handler); err != nil { t.Fatal(err) } - srv.Stop() + srv.Shutdown() err := srv.Start(handler) - if err != ErrServerStopped { - t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerStopped error", err) + if err != ErrServerClosed { + t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerClosed error", err) } } @@ -100,7 +100,7 @@ func TestServerErrNilHandler(t *testing.T) { err := srv.Start(nil) if err == nil { t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error") - srv.Stop() + srv.Shutdown() } } @@ -114,7 +114,7 @@ func TestServerErrServerRunning(t *testing.T) { if err == nil { t.Error("Calling (*Server).Start(handler) on already running server did not return error") } - srv.Stop() + srv.Shutdown() } func TestServerWithRedisDown(t *testing.T) { @@ -146,7 +146,7 @@ func TestServerWithRedisDown(t *testing.T) { time.Sleep(3 * time.Second) - srv.Stop() + srv.Shutdown() } func TestServerWithFlakyBroker(t *testing.T) { @@ -207,7 +207,7 @@ func TestServerWithFlakyBroker(t *testing.T) { time.Sleep(3 * time.Second) - srv.Stop() + srv.Shutdown() } func TestLogLevel(t *testing.T) { diff --git a/signals_unix.go b/signals_unix.go index 41f878b..e2c767b 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -22,7 +22,7 @@ func (srv *Server) waitForSignals() { for { sig := <-sigs if sig == unix.SIGTSTP { - srv.Quiet() + srv.Stop() continue } break diff --git a/subscriber.go b/subscriber.go index aa895f9..a30a172 100644 --- a/subscriber.go +++ b/subscriber.go @@ -43,7 +43,7 @@ func newSubscriber(params subscriberParams) *subscriber { } } -func (s *subscriber) terminate() { +func (s *subscriber) shutdown() { s.logger.Debug("Subscriber shutting down...") // Signal the subscriber goroutine to stop. s.done <- struct{}{} diff --git a/subscriber_test.go b/subscriber_test.go index 709aca9..ec4e65b 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -46,7 +46,7 @@ func TestSubscriber(t *testing.T) { }) var wg sync.WaitGroup subscriber.start(&wg) - defer subscriber.terminate() + defer subscriber.shutdown() // wait for subscriber to establish connection to pubsub channel time.Sleep(time.Second) @@ -91,7 +91,7 @@ func TestSubscriberWithRedisDown(t *testing.T) { testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. var wg sync.WaitGroup subscriber.start(&wg) - defer subscriber.terminate() + defer subscriber.shutdown() time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis. diff --git a/syncer.go b/syncer.go index d108a7a..f1be193 100644 --- a/syncer.go +++ b/syncer.go @@ -46,7 +46,7 @@ func newSyncer(params syncerParams) *syncer { } } -func (s *syncer) terminate() { +func (s *syncer) shutdown() { s.logger.Debug("Syncer shutting down...") // Signal the syncer goroutine to stop. s.done <- struct{}{} diff --git a/syncer_test.go b/syncer_test.go index f46f2db..8ffd575 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -35,7 +35,7 @@ func TestSyncer(t *testing.T) { }) var wg sync.WaitGroup syncer.start(&wg) - defer syncer.terminate() + defer syncer.shutdown() for _, msg := range inProgress { m := msg @@ -66,7 +66,7 @@ func TestSyncerRetry(t *testing.T) { var wg sync.WaitGroup syncer.start(&wg) - defer syncer.terminate() + defer syncer.shutdown() var ( mu sync.Mutex @@ -131,7 +131,7 @@ func TestSyncerDropsStaleRequests(t *testing.T) { } time.Sleep(2 * interval) // ensure that syncer runs at least once - syncer.terminate() + syncer.shutdown() mu.Lock() if n != 0 { diff --git a/tools/asynq/cmd/server.go b/tools/asynq/cmd/server.go index 4958a4e..90e899f 100644 --- a/tools/asynq/cmd/server.go +++ b/tools/asynq/cmd/server.go @@ -35,11 +35,11 @@ The command shows the following for each server: * Host and PID of the process in which the server is running * Number of active workers out of worker pool * Queue configuration -* State of the worker server ("running" | "quiet") +* State of the worker server ("active" | "stopped") * Time the server was started -A "running" server is pulling tasks from queues and processing them. -A "quiet" server is no longer pulling new tasks from queues`, +A "active" server is pulling tasks from queues and processing them. +A "stopped" server is no longer pulling new tasks from queues`, Run: serverList, }