mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 08:40:22 +08:00
Rename terminate to shutdown internally
This commit is contained in:
parent
105084d6fb
commit
056d787935
@ -30,7 +30,7 @@ func ExampleServer_Run() {
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleServer_Stop() {
|
||||
func ExampleServer_Shutdown() {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: ":6379"},
|
||||
asynq.Config{Concurrency: 20},
|
||||
@ -47,10 +47,10 @@ func ExampleServer_Stop() {
|
||||
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
|
||||
<-sigs // wait for termination signal
|
||||
|
||||
srv.Stop()
|
||||
srv.Shutdown()
|
||||
}
|
||||
|
||||
func ExampleServer_Quiet() {
|
||||
func ExampleServer_Stop() {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: ":6379"},
|
||||
asynq.Config{Concurrency: 20},
|
||||
@ -73,7 +73,7 @@ func ExampleServer_Quiet() {
|
||||
srv.Stop() // stop processing new tasks
|
||||
continue
|
||||
}
|
||||
break
|
||||
break // received SIGTERM or SIGINT signal
|
||||
}
|
||||
|
||||
srv.Shutdown()
|
||||
|
@ -45,7 +45,7 @@ func newForwarder(params forwarderParams) *forwarder {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *forwarder) terminate() {
|
||||
func (f *forwarder) shutdown() {
|
||||
f.logger.Debug("Forwarder shutting down...")
|
||||
// Signal the forwarder goroutine to stop polling.
|
||||
f.done <- struct{}{}
|
||||
|
@ -111,7 +111,7 @@ func TestForwarder(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
s.start(&wg)
|
||||
time.Sleep(tc.wait)
|
||||
s.terminate()
|
||||
s.shutdown()
|
||||
|
||||
for qname, want := range tc.wantScheduled {
|
||||
gotScheduled := h.GetScheduledMessages(t, r, qname)
|
||||
|
@ -45,7 +45,7 @@ func newHealthChecker(params healthcheckerParams) *healthchecker {
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *healthchecker) terminate() {
|
||||
func (hc *healthchecker) shutdown() {
|
||||
if hc.healthcheckFunc == nil {
|
||||
return
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ func TestHealthChecker(t *testing.T) {
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
hc.terminate()
|
||||
hc.shutdown()
|
||||
}
|
||||
|
||||
func TestHealthCheckerWhenRedisDown(t *testing.T) {
|
||||
@ -99,5 +99,5 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) {
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
hc.terminate()
|
||||
hc.shutdown()
|
||||
}
|
||||
|
@ -86,7 +86,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *heartbeater) terminate() {
|
||||
func (h *heartbeater) shutdown() {
|
||||
h.logger.Debug("Heartbeater shutting down...")
|
||||
// Signal the heartbeater goroutine to stop.
|
||||
h.done <- struct{}{}
|
||||
|
@ -74,19 +74,19 @@ func TestHeartbeater(t *testing.T) {
|
||||
ss, err := rdbClient.ListServers()
|
||||
if err != nil {
|
||||
t.Errorf("could not read server info from redis: %v", err)
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
if len(ss) != 1 {
|
||||
t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss))
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
|
||||
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
@ -100,23 +100,23 @@ func TestHeartbeater(t *testing.T) {
|
||||
ss, err = rdbClient.ListServers()
|
||||
if err != nil {
|
||||
t.Errorf("could not read process status from redis: %v", err)
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
if len(ss) != 1 {
|
||||
t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss))
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
|
||||
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
continue
|
||||
}
|
||||
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
@ -152,5 +152,5 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
|
||||
// wait for heartbeater to try writing data to redis
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
hb.terminate()
|
||||
hb.shutdown()
|
||||
}
|
||||
|
@ -123,8 +123,8 @@ func (p *processor) stop() {
|
||||
})
|
||||
}
|
||||
|
||||
// NOTE: once terminated, processor cannot be re-started.
|
||||
func (p *processor) terminate() {
|
||||
// NOTE: once shutdown, processor cannot be re-started.
|
||||
func (p *processor) shutdown() {
|
||||
p.stop()
|
||||
|
||||
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
|
||||
|
@ -113,7 +113,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
for _, msg := range tc.incoming {
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
@ -121,7 +121,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
|
||||
}
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
@ -213,7 +213,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
|
||||
}
|
||||
}
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
@ -290,7 +290,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
|
||||
}
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
@ -418,12 +418,12 @@ func TestProcessorRetry(t *testing.T) {
|
||||
for _, msg := range tc.incoming {
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(tc.wait) // FIXME: This makes test flaky.
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
|
||||
cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score
|
||||
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
|
||||
@ -594,7 +594,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
|
||||
}
|
||||
}
|
||||
p.terminate()
|
||||
p.shutdown()
|
||||
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
|
@ -47,7 +47,7 @@ func newRecoverer(params recovererParams) *recoverer {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recoverer) terminate() {
|
||||
func (r *recoverer) shutdown() {
|
||||
r.logger.Debug("Recoverer shutting down...")
|
||||
// Signal the recoverer goroutine to stop polling.
|
||||
r.done <- struct{}{}
|
||||
|
@ -239,7 +239,7 @@ func TestRecoverer(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
recoverer.start(&wg)
|
||||
time.Sleep(2 * time.Second)
|
||||
recoverer.terminate()
|
||||
recoverer.shutdown()
|
||||
|
||||
for qname, want := range tc.wantActive {
|
||||
gotActive := h.GetActiveMessages(t, r, qname)
|
||||
|
41
server.go
41
server.go
@ -21,17 +21,18 @@ import (
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Server is responsible for managing the task processing.
|
||||
// Server is responsible for task processing and task lifecycle management.
|
||||
//
|
||||
// Server pulls tasks off queues and processes them.
|
||||
// If the processing of a task is unsuccessful, server will schedule it for a retry.
|
||||
//
|
||||
// A task will be retried until either the task gets processed successfully
|
||||
// or until it reaches its max retry count.
|
||||
//
|
||||
// If a task exhausts its retries, it will be moved to the archive and
|
||||
// will be kept in the archive for some time until a certain condition is met
|
||||
// (e.g., archive size reaches a certain limit, or the task has been in the
|
||||
// archive for a certain amount of time).
|
||||
// will be kept in the archive set.
|
||||
// Note that the archive size is finite and once it reaches its max size,
|
||||
// oldest tasks in the archive will be deleted.
|
||||
type Server struct {
|
||||
logger *log.Logger
|
||||
|
||||
@ -442,11 +443,12 @@ func (srv *Server) Run(handler Handler) error {
|
||||
}
|
||||
|
||||
// Start starts the worker server. Once the server has started,
|
||||
// it pulls tasks off queues and starts a worker goroutine for each task.
|
||||
// Tasks are processed concurrently by the workers up to the number of
|
||||
// concurrency specified at the initialization time.
|
||||
// it pulls tasks off queues and starts a worker goroutine for each task
|
||||
// and then call Handler to process it.
|
||||
// Tasks are processed concurrently by the workers up to the number of
|
||||
// concurrency specified in Config.Concurrency.
|
||||
//
|
||||
// Start returns any error encountered during server startup time.
|
||||
// Start returns any error encountered at server startup time.
|
||||
// If the server has already been shutdown, ErrServerClosed is returned.
|
||||
func (srv *Server) Start(handler Handler) error {
|
||||
if handler == nil {
|
||||
@ -455,6 +457,8 @@ func (srv *Server) Start(handler Handler) error {
|
||||
switch srv.state.Get() {
|
||||
case base.StateActive:
|
||||
return fmt.Errorf("asynq: the server is already running")
|
||||
case base.StateStopped:
|
||||
return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.")
|
||||
case base.StateClosed:
|
||||
return ErrServerClosed
|
||||
}
|
||||
@ -485,17 +489,17 @@ func (srv *Server) Shutdown() {
|
||||
}
|
||||
|
||||
srv.logger.Info("Starting graceful shutdown")
|
||||
// Note: The order of termination is important.
|
||||
// Note: The order of shutdown is important.
|
||||
// Sender goroutines should be terminated before the receiver goroutines.
|
||||
// processor -> syncer (via syncCh)
|
||||
// processor -> heartbeater (via starting, finished channels)
|
||||
srv.forwarder.terminate()
|
||||
srv.processor.terminate()
|
||||
srv.recoverer.terminate()
|
||||
srv.syncer.terminate()
|
||||
srv.subscriber.terminate()
|
||||
srv.healthchecker.terminate()
|
||||
srv.heartbeater.terminate()
|
||||
srv.forwarder.shutdown()
|
||||
srv.processor.shutdown()
|
||||
srv.recoverer.shutdown()
|
||||
srv.syncer.shutdown()
|
||||
srv.subscriber.shutdown()
|
||||
srv.healthchecker.shutdown()
|
||||
srv.heartbeater.shutdown()
|
||||
|
||||
srv.wg.Wait()
|
||||
|
||||
@ -506,7 +510,10 @@ func (srv *Server) Shutdown() {
|
||||
}
|
||||
|
||||
// Stop signals the server to stop pulling new tasks off queues.
|
||||
// Stop should be used before shutting down the server.
|
||||
// Stop can be used before shutting down the server to ensure that all
|
||||
// currently active tasks are processed before server shutdown.
|
||||
//
|
||||
// Stop does not shutdown the server, make sure to call Shutdown before exit.
|
||||
func (srv *Server) Stop() {
|
||||
srv.logger.Info("Stopping processor")
|
||||
srv.processor.stop()
|
||||
|
@ -43,7 +43,7 @@ func newSubscriber(params subscriberParams) *subscriber {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) terminate() {
|
||||
func (s *subscriber) shutdown() {
|
||||
s.logger.Debug("Subscriber shutting down...")
|
||||
// Signal the subscriber goroutine to stop.
|
||||
s.done <- struct{}{}
|
||||
|
@ -46,7 +46,7 @@ func TestSubscriber(t *testing.T) {
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
subscriber.start(&wg)
|
||||
defer subscriber.terminate()
|
||||
defer subscriber.shutdown()
|
||||
|
||||
// wait for subscriber to establish connection to pubsub channel
|
||||
time.Sleep(time.Second)
|
||||
@ -91,7 +91,7 @@ func TestSubscriberWithRedisDown(t *testing.T) {
|
||||
testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis.
|
||||
var wg sync.WaitGroup
|
||||
subscriber.start(&wg)
|
||||
defer subscriber.terminate()
|
||||
defer subscriber.shutdown()
|
||||
|
||||
time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis.
|
||||
|
||||
|
@ -46,7 +46,7 @@ func newSyncer(params syncerParams) *syncer {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncer) terminate() {
|
||||
func (s *syncer) shutdown() {
|
||||
s.logger.Debug("Syncer shutting down...")
|
||||
// Signal the syncer goroutine to stop.
|
||||
s.done <- struct{}{}
|
||||
|
@ -35,7 +35,7 @@ func TestSyncer(t *testing.T) {
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
syncer.start(&wg)
|
||||
defer syncer.terminate()
|
||||
defer syncer.shutdown()
|
||||
|
||||
for _, msg := range inProgress {
|
||||
m := msg
|
||||
@ -66,7 +66,7 @@ func TestSyncerRetry(t *testing.T) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
syncer.start(&wg)
|
||||
defer syncer.terminate()
|
||||
defer syncer.shutdown()
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
@ -131,7 +131,7 @@ func TestSyncerDropsStaleRequests(t *testing.T) {
|
||||
}
|
||||
|
||||
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||
syncer.terminate()
|
||||
syncer.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if n != 0 {
|
||||
|
Loading…
x
Reference in New Issue
Block a user