diff --git a/example_test.go b/example_test.go index 9932679..c2909a2 100644 --- a/example_test.go +++ b/example_test.go @@ -30,7 +30,7 @@ func ExampleServer_Run() { } } -func ExampleServer_Stop() { +func ExampleServer_Shutdown() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, @@ -47,10 +47,10 @@ func ExampleServer_Stop() { signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) <-sigs // wait for termination signal - srv.Stop() + srv.Shutdown() } -func ExampleServer_Quiet() { +func ExampleServer_Stop() { srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{Concurrency: 20}, @@ -73,7 +73,7 @@ func ExampleServer_Quiet() { srv.Stop() // stop processing new tasks continue } - break + break // received SIGTERM or SIGINT signal } srv.Shutdown() diff --git a/forwarder.go b/forwarder.go index d7bd1f5..ec1c1f2 100644 --- a/forwarder.go +++ b/forwarder.go @@ -45,7 +45,7 @@ func newForwarder(params forwarderParams) *forwarder { } } -func (f *forwarder) terminate() { +func (f *forwarder) shutdown() { f.logger.Debug("Forwarder shutting down...") // Signal the forwarder goroutine to stop polling. f.done <- struct{}{} diff --git a/forwarder_test.go b/forwarder_test.go index a197f58..da251e1 100644 --- a/forwarder_test.go +++ b/forwarder_test.go @@ -111,7 +111,7 @@ func TestForwarder(t *testing.T) { var wg sync.WaitGroup s.start(&wg) time.Sleep(tc.wait) - s.terminate() + s.shutdown() for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledMessages(t, r, qname) diff --git a/healthcheck.go b/healthcheck.go index 84526a0..83f9916 100644 --- a/healthcheck.go +++ b/healthcheck.go @@ -45,7 +45,7 @@ func newHealthChecker(params healthcheckerParams) *healthchecker { } } -func (hc *healthchecker) terminate() { +func (hc *healthchecker) shutdown() { if hc.healthcheckFunc == nil { return } diff --git a/healthcheck_test.go b/healthcheck_test.go index 4b4c15e..e61e897 100644 --- a/healthcheck_test.go +++ b/healthcheck_test.go @@ -51,7 +51,7 @@ func TestHealthChecker(t *testing.T) { } mu.Unlock() - hc.terminate() + hc.shutdown() } func TestHealthCheckerWhenRedisDown(t *testing.T) { @@ -99,5 +99,5 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) { } mu.Unlock() - hc.terminate() + hc.shutdown() } diff --git a/heartbeat.go b/heartbeat.go index 8117190..a51158f 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -86,7 +86,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater { } } -func (h *heartbeater) terminate() { +func (h *heartbeater) shutdown() { h.logger.Debug("Heartbeater shutting down...") // Signal the heartbeater goroutine to stop. h.done <- struct{}{} diff --git a/heartbeat_test.go b/heartbeat_test.go index 609d38e..518c5b0 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -74,19 +74,19 @@ func TestHeartbeater(t *testing.T) { ss, err := rdbClient.ListServers() if err != nil { t.Errorf("could not read server info from redis: %v", err) - hb.terminate() + hb.shutdown() continue } if len(ss) != 1 { t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) - hb.terminate() + hb.shutdown() continue } if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) - hb.terminate() + hb.shutdown() continue } @@ -100,23 +100,23 @@ func TestHeartbeater(t *testing.T) { ss, err = rdbClient.ListServers() if err != nil { t.Errorf("could not read process status from redis: %v", err) - hb.terminate() + hb.shutdown() continue } if len(ss) != 1 { t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) - hb.terminate() + hb.shutdown() continue } if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) - hb.terminate() + hb.shutdown() continue } - hb.terminate() + hb.shutdown() } } @@ -152,5 +152,5 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { // wait for heartbeater to try writing data to redis time.Sleep(2 * time.Second) - hb.terminate() + hb.shutdown() } diff --git a/processor.go b/processor.go index e480650..605d077 100644 --- a/processor.go +++ b/processor.go @@ -123,8 +123,8 @@ func (p *processor) stop() { }) } -// NOTE: once terminated, processor cannot be re-started. -func (p *processor) terminate() { +// NOTE: once shutdown, processor cannot be re-started. +func (p *processor) shutdown() { p.stop() time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) }) diff --git a/processor_test.go b/processor_test.go index 6fd3f25..cedb427 100644 --- a/processor_test.go +++ b/processor_test.go @@ -113,7 +113,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { - p.terminate() + p.shutdown() t.Fatal(err) } } @@ -121,7 +121,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -213,7 +213,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -290,7 +290,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } - p.terminate() + p.shutdown() mu.Lock() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { @@ -418,12 +418,12 @@ func TestProcessorRetry(t *testing.T) { for _, msg := range tc.incoming { err := rdbClient.Enqueue(msg) if err != nil { - p.terminate() + p.shutdown() t.Fatal(err) } } time.Sleep(tc.wait) // FIXME: This makes test flaky. - p.terminate() + p.shutdown() cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) @@ -594,7 +594,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } - p.terminate() + p.shutdown() if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) diff --git a/recoverer.go b/recoverer.go index 5a6da19..9c3dae3 100644 --- a/recoverer.go +++ b/recoverer.go @@ -47,7 +47,7 @@ func newRecoverer(params recovererParams) *recoverer { } } -func (r *recoverer) terminate() { +func (r *recoverer) shutdown() { r.logger.Debug("Recoverer shutting down...") // Signal the recoverer goroutine to stop polling. r.done <- struct{}{} diff --git a/recoverer_test.go b/recoverer_test.go index ae32674..91afde4 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -239,7 +239,7 @@ func TestRecoverer(t *testing.T) { var wg sync.WaitGroup recoverer.start(&wg) time.Sleep(2 * time.Second) - recoverer.terminate() + recoverer.shutdown() for qname, want := range tc.wantActive { gotActive := h.GetActiveMessages(t, r, qname) diff --git a/server.go b/server.go index 4ebce90..71d5f46 100644 --- a/server.go +++ b/server.go @@ -21,17 +21,18 @@ import ( "github.com/hibiken/asynq/internal/rdb" ) -// Server is responsible for managing the task processing. +// Server is responsible for task processing and task lifecycle management. // // Server pulls tasks off queues and processes them. // If the processing of a task is unsuccessful, server will schedule it for a retry. +// // A task will be retried until either the task gets processed successfully // or until it reaches its max retry count. // // If a task exhausts its retries, it will be moved to the archive and -// will be kept in the archive for some time until a certain condition is met -// (e.g., archive size reaches a certain limit, or the task has been in the -// archive for a certain amount of time). +// will be kept in the archive set. +// Note that the archive size is finite and once it reaches its max size, +// oldest tasks in the archive will be deleted. type Server struct { logger *log.Logger @@ -442,11 +443,12 @@ func (srv *Server) Run(handler Handler) error { } // Start starts the worker server. Once the server has started, -// it pulls tasks off queues and starts a worker goroutine for each task. -// Tasks are processed concurrently by the workers up to the number of -// concurrency specified at the initialization time. +// it pulls tasks off queues and starts a worker goroutine for each task +// and then call Handler to process it. +// Tasks are processed concurrently by the workers up to the number of +// concurrency specified in Config.Concurrency. // -// Start returns any error encountered during server startup time. +// Start returns any error encountered at server startup time. // If the server has already been shutdown, ErrServerClosed is returned. func (srv *Server) Start(handler Handler) error { if handler == nil { @@ -455,6 +457,8 @@ func (srv *Server) Start(handler Handler) error { switch srv.state.Get() { case base.StateActive: return fmt.Errorf("asynq: the server is already running") + case base.StateStopped: + return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.") case base.StateClosed: return ErrServerClosed } @@ -485,17 +489,17 @@ func (srv *Server) Shutdown() { } srv.logger.Info("Starting graceful shutdown") - // Note: The order of termination is important. + // Note: The order of shutdown is important. // Sender goroutines should be terminated before the receiver goroutines. // processor -> syncer (via syncCh) // processor -> heartbeater (via starting, finished channels) - srv.forwarder.terminate() - srv.processor.terminate() - srv.recoverer.terminate() - srv.syncer.terminate() - srv.subscriber.terminate() - srv.healthchecker.terminate() - srv.heartbeater.terminate() + srv.forwarder.shutdown() + srv.processor.shutdown() + srv.recoverer.shutdown() + srv.syncer.shutdown() + srv.subscriber.shutdown() + srv.healthchecker.shutdown() + srv.heartbeater.shutdown() srv.wg.Wait() @@ -506,7 +510,10 @@ func (srv *Server) Shutdown() { } // Stop signals the server to stop pulling new tasks off queues. -// Stop should be used before shutting down the server. +// Stop can be used before shutting down the server to ensure that all +// currently active tasks are processed before server shutdown. +// +// Stop does not shutdown the server, make sure to call Shutdown before exit. func (srv *Server) Stop() { srv.logger.Info("Stopping processor") srv.processor.stop() diff --git a/subscriber.go b/subscriber.go index aa895f9..a30a172 100644 --- a/subscriber.go +++ b/subscriber.go @@ -43,7 +43,7 @@ func newSubscriber(params subscriberParams) *subscriber { } } -func (s *subscriber) terminate() { +func (s *subscriber) shutdown() { s.logger.Debug("Subscriber shutting down...") // Signal the subscriber goroutine to stop. s.done <- struct{}{} diff --git a/subscriber_test.go b/subscriber_test.go index 709aca9..ec4e65b 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -46,7 +46,7 @@ func TestSubscriber(t *testing.T) { }) var wg sync.WaitGroup subscriber.start(&wg) - defer subscriber.terminate() + defer subscriber.shutdown() // wait for subscriber to establish connection to pubsub channel time.Sleep(time.Second) @@ -91,7 +91,7 @@ func TestSubscriberWithRedisDown(t *testing.T) { testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. var wg sync.WaitGroup subscriber.start(&wg) - defer subscriber.terminate() + defer subscriber.shutdown() time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis. diff --git a/syncer.go b/syncer.go index d108a7a..f1be193 100644 --- a/syncer.go +++ b/syncer.go @@ -46,7 +46,7 @@ func newSyncer(params syncerParams) *syncer { } } -func (s *syncer) terminate() { +func (s *syncer) shutdown() { s.logger.Debug("Syncer shutting down...") // Signal the syncer goroutine to stop. s.done <- struct{}{} diff --git a/syncer_test.go b/syncer_test.go index f46f2db..8ffd575 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -35,7 +35,7 @@ func TestSyncer(t *testing.T) { }) var wg sync.WaitGroup syncer.start(&wg) - defer syncer.terminate() + defer syncer.shutdown() for _, msg := range inProgress { m := msg @@ -66,7 +66,7 @@ func TestSyncerRetry(t *testing.T) { var wg sync.WaitGroup syncer.start(&wg) - defer syncer.terminate() + defer syncer.shutdown() var ( mu sync.Mutex @@ -131,7 +131,7 @@ func TestSyncerDropsStaleRequests(t *testing.T) { } time.Sleep(2 * interval) // ensure that syncer runs at least once - syncer.terminate() + syncer.shutdown() mu.Lock() if n != 0 {