2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

Change Server API

* Rename ServerStatus to ServerState internally

* Rename terminate to shutdown internally

* Update Scheduler API to match Server API
This commit is contained in:
Ken Hibino 2021-03-23 06:20:54 -07:00
parent 476812475e
commit 9c95c41651
26 changed files with 175 additions and 169 deletions

View File

@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- NewTask function now takes array of bytes as payload. - NewTask function now takes array of bytes as payload.
- Task `Type` and `Payload` should be accessed by a method call. - Task `Type` and `Payload` should be accessed by a method call.
- `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`.
- `Scheduler` API has changed. Renamed `Stop` to `Shutdown`.
- Requires redis v4.0+ for multiple field/value pair support - Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script - Renamed pending key (TODO: need migration script

View File

@ -30,7 +30,7 @@ func ExampleServer_Run() {
} }
} }
func ExampleServer_Stop() { func ExampleServer_Shutdown() {
srv := asynq.NewServer( srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"}, asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20}, asynq.Config{Concurrency: 20},
@ -47,10 +47,10 @@ func ExampleServer_Stop() {
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs // wait for termination signal <-sigs // wait for termination signal
srv.Stop() srv.Shutdown()
} }
func ExampleServer_Quiet() { func ExampleServer_Stop() {
srv := asynq.NewServer( srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"}, asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20}, asynq.Config{Concurrency: 20},
@ -70,13 +70,13 @@ func ExampleServer_Quiet() {
for { for {
s := <-sigs s := <-sigs
if s == unix.SIGTSTP { if s == unix.SIGTSTP {
srv.Quiet() // stop processing new tasks srv.Stop() // stop processing new tasks
continue continue
} }
break break // received SIGTERM or SIGINT signal
} }
srv.Stop() srv.Shutdown()
} }
func ExampleScheduler() { func ExampleScheduler() {

View File

@ -45,7 +45,7 @@ func newForwarder(params forwarderParams) *forwarder {
} }
} }
func (f *forwarder) terminate() { func (f *forwarder) shutdown() {
f.logger.Debug("Forwarder shutting down...") f.logger.Debug("Forwarder shutting down...")
// Signal the forwarder goroutine to stop polling. // Signal the forwarder goroutine to stop polling.
f.done <- struct{}{} f.done <- struct{}{}

View File

@ -111,7 +111,7 @@ func TestForwarder(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
s.start(&wg) s.start(&wg)
time.Sleep(tc.wait) time.Sleep(tc.wait)
s.terminate() s.shutdown()
for qname, want := range tc.wantScheduled { for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r, qname) gotScheduled := h.GetScheduledMessages(t, r, qname)

View File

@ -45,7 +45,7 @@ func newHealthChecker(params healthcheckerParams) *healthchecker {
} }
} }
func (hc *healthchecker) terminate() { func (hc *healthchecker) shutdown() {
if hc.healthcheckFunc == nil { if hc.healthcheckFunc == nil {
return return
} }

View File

@ -51,7 +51,7 @@ func TestHealthChecker(t *testing.T) {
} }
mu.Unlock() mu.Unlock()
hc.terminate() hc.shutdown()
} }
func TestHealthCheckerWhenRedisDown(t *testing.T) { func TestHealthCheckerWhenRedisDown(t *testing.T) {
@ -99,5 +99,5 @@ func TestHealthCheckerWhenRedisDown(t *testing.T) {
} }
mu.Unlock() mu.Unlock()
hc.terminate() hc.shutdown()
} }

View File

@ -40,8 +40,8 @@ type heartbeater struct {
started time.Time started time.Time
workers map[string]*workerInfo workers map[string]*workerInfo
// status is shared with other goroutine but is concurrency safe. // state is shared with other goroutine but is concurrency safe.
status *base.ServerStatus state *base.ServerState
// channels to receive updates on active workers. // channels to receive updates on active workers.
starting <-chan *workerInfo starting <-chan *workerInfo
@ -55,7 +55,7 @@ type heartbeaterParams struct {
concurrency int concurrency int
queues map[string]int queues map[string]int
strictPriority bool strictPriority bool
status *base.ServerStatus state *base.ServerState
starting <-chan *workerInfo starting <-chan *workerInfo
finished <-chan *base.TaskMessage finished <-chan *base.TaskMessage
} }
@ -79,14 +79,14 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
queues: params.queues, queues: params.queues,
strictPriority: params.strictPriority, strictPriority: params.strictPriority,
status: params.status, state: params.state,
workers: make(map[string]*workerInfo), workers: make(map[string]*workerInfo),
starting: params.starting, starting: params.starting,
finished: params.finished, finished: params.finished,
} }
} }
func (h *heartbeater) terminate() { func (h *heartbeater) shutdown() {
h.logger.Debug("Heartbeater shutting down...") h.logger.Debug("Heartbeater shutting down...")
// Signal the heartbeater goroutine to stop. // Signal the heartbeater goroutine to stop.
h.done <- struct{}{} h.done <- struct{}{}
@ -142,7 +142,7 @@ func (h *heartbeater) beat() {
Concurrency: h.concurrency, Concurrency: h.concurrency,
Queues: h.queues, Queues: h.queues,
StrictPriority: h.strictPriority, StrictPriority: h.strictPriority,
Status: h.status.String(), Status: h.state.String(),
Started: h.started, Started: h.started,
ActiveWorkerCount: len(h.workers), ActiveWorkerCount: len(h.workers),
} }

View File

@ -38,7 +38,7 @@ func TestHeartbeater(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) h.FlushDB(t, r)
status := base.NewServerStatus(base.StatusIdle) state := base.NewServerState()
hb := newHeartbeater(heartbeaterParams{ hb := newHeartbeater(heartbeaterParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
@ -46,7 +46,7 @@ func TestHeartbeater(t *testing.T) {
concurrency: tc.concurrency, concurrency: tc.concurrency,
queues: tc.queues, queues: tc.queues,
strictPriority: false, strictPriority: false,
status: status, state: state,
starting: make(chan *workerInfo), starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage), finished: make(chan *base.TaskMessage),
}) })
@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) {
hb.host = tc.host hb.host = tc.host
hb.pid = tc.pid hb.pid = tc.pid
status.Set(base.StatusRunning) state.Set(base.StateActive)
var wg sync.WaitGroup var wg sync.WaitGroup
hb.start(&wg) hb.start(&wg)
@ -65,7 +65,7 @@ func TestHeartbeater(t *testing.T) {
Queues: tc.queues, Queues: tc.queues,
Concurrency: tc.concurrency, Concurrency: tc.concurrency,
Started: time.Now(), Started: time.Now(),
Status: "running", Status: "active",
} }
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
@ -74,49 +74,49 @@ func TestHeartbeater(t *testing.T) {
ss, err := rdbClient.ListServers() ss, err := rdbClient.ListServers()
if err != nil { if err != nil {
t.Errorf("could not read server info from redis: %v", err) t.Errorf("could not read server info from redis: %v", err)
hb.terminate() hb.shutdown()
continue continue
} }
if len(ss) != 1 { if len(ss) != 1 {
t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss))
hb.terminate() hb.shutdown()
continue continue
} }
if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
hb.terminate() hb.shutdown()
continue continue
} }
// status change // status change
status.Set(base.StatusStopped) state.Set(base.StateClosed)
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)
want.Status = "stopped" want.Status = "closed"
ss, err = rdbClient.ListServers() ss, err = rdbClient.ListServers()
if err != nil { if err != nil {
t.Errorf("could not read process status from redis: %v", err) t.Errorf("could not read process status from redis: %v", err)
hb.terminate() hb.shutdown()
continue continue
} }
if len(ss) != 1 { if len(ss) != 1 {
t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss))
hb.terminate() hb.shutdown()
continue continue
} }
if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff)
hb.terminate() hb.shutdown()
continue continue
} }
hb.terminate() hb.shutdown()
} }
} }
@ -131,6 +131,8 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
r := rdb.NewRDB(setup(t)) r := rdb.NewRDB(setup(t))
defer r.Close() defer r.Close()
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
state := base.NewServerState()
state.Set(base.StateActive)
hb := newHeartbeater(heartbeaterParams{ hb := newHeartbeater(heartbeaterParams{
logger: testLogger, logger: testLogger,
broker: testBroker, broker: testBroker,
@ -138,7 +140,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
concurrency: 10, concurrency: 10,
queues: map[string]int{"default": 1}, queues: map[string]int{"default": 1},
strictPriority: false, strictPriority: false,
status: base.NewServerStatus(base.StatusRunning), state: state,
starting: make(chan *workerInfo), starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage), finished: make(chan *base.TaskMessage),
}) })
@ -150,5 +152,5 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
// wait for heartbeater to try writing data to redis // wait for heartbeater to try writing data to redis
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
hb.terminate() hb.shutdown()
} }

View File

@ -215,52 +215,55 @@ type Z struct {
Score int64 Score int64
} }
// ServerStatus represents status of a server. // ServerState represents state of a server.
// ServerStatus methods are concurrency safe. // ServerState methods are concurrency safe.
type ServerStatus struct { type ServerState struct {
mu sync.Mutex mu sync.Mutex
val ServerStatusValue val ServerStateValue
} }
// NewServerStatus returns a new status instance given an initial value. // NewServerState returns a new state instance.
func NewServerStatus(v ServerStatusValue) *ServerStatus { // Initial state is set to StateNew.
return &ServerStatus{val: v} func NewServerState() *ServerState {
return &ServerState{val: StateNew}
} }
type ServerStatusValue int type ServerStateValue int
const ( const (
// StatusIdle indicates the server is in idle state. // StateNew represents a new server. Server begins in
StatusIdle ServerStatusValue = iota // this state and then transition to StatusActive when
// Start or Run is callled.
StateNew ServerStateValue = iota
// StatusRunning indicates the server is up and active. // StateActive indicates the server is up and active.
StatusRunning StateActive
// StatusQuiet indicates the server is up but not active. // StateStopped indicates the server is up but no longer processing new tasks.
StatusQuiet StateStopped
// StatusStopped indicates the server server has been stopped. // StateClosed indicates the server has been shutdown.
StatusStopped StateClosed
) )
var statuses = []string{ var serverStates = []string{
"idle", "new",
"running", "active",
"quiet",
"stopped", "stopped",
"closed",
} }
func (s *ServerStatus) String() string { func (s *ServerState) String() string {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if StatusIdle <= s.val && s.val <= StatusStopped { if StateNew <= s.val && s.val <= StateClosed {
return statuses[s.val] return serverStates[s.val]
} }
return "unknown status" return "unknown status"
} }
// Get returns the status value. // Get returns the status value.
func (s *ServerStatus) Get() ServerStatusValue { func (s *ServerState) Get() ServerStateValue {
s.mu.Lock() s.mu.Lock()
v := s.val v := s.val
s.mu.Unlock() s.mu.Unlock()
@ -268,7 +271,7 @@ func (s *ServerStatus) Get() ServerStatusValue {
} }
// Set sets the status value. // Set sets the status value.
func (s *ServerStatus) Set(v ServerStatusValue) { func (s *ServerState) Set(v ServerStateValue) {
s.mu.Lock() s.mu.Lock()
s.val = v s.val = v
s.mu.Unlock() s.mu.Unlock()

View File

@ -400,7 +400,7 @@ func TestServerInfoEncoding(t *testing.T) {
Concurrency: 10, Concurrency: 10,
Queues: map[string]int{"default": 1, "critical": 2}, Queues: map[string]int{"default": 1, "critical": 2},
StrictPriority: false, StrictPriority: false,
Status: "running", Status: "active",
Started: time.Now().Add(-3 * time.Hour), Started: time.Now().Add(-3 * time.Hour),
ActiveWorkerCount: 8, ActiveWorkerCount: 8,
}, },
@ -530,7 +530,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) {
// Test for status being accessed by multiple goroutines. // Test for status being accessed by multiple goroutines.
// Run with -race flag to check for data race. // Run with -race flag to check for data race.
func TestStatusConcurrentAccess(t *testing.T) { func TestStatusConcurrentAccess(t *testing.T) {
status := NewServerStatus(StatusIdle) status := NewServerState()
var wg sync.WaitGroup var wg sync.WaitGroup
@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
status.Set(StatusStopped) status.Set(StateClosed)
_ = status.String() _ = status.String()
}() }()

View File

@ -3305,7 +3305,7 @@ func TestListServers(t *testing.T) {
ServerID: "server123", ServerID: "server123",
Concurrency: 10, Concurrency: 10,
Queues: map[string]int{"default": 1}, Queues: map[string]int{"default": 1},
Status: "running", Status: "active",
Started: started1, Started: started1,
ActiveWorkerCount: 0, ActiveWorkerCount: 0,
} }

View File

@ -1475,7 +1475,7 @@ func TestWriteServerState(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().UTC(), Started: time.Now().UTC(),
Status: "running", Status: "active",
ActiveWorkerCount: 0, ActiveWorkerCount: 0,
} }
@ -1565,7 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().Add(-10 * time.Minute).UTC(), Started: time.Now().Add(-10 * time.Minute).UTC(),
Status: "running", Status: "active",
ActiveWorkerCount: len(workers), ActiveWorkerCount: len(workers),
} }
@ -1667,7 +1667,7 @@ func TestClearServerState(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().Add(-10 * time.Minute), Started: time.Now().Add(-10 * time.Minute),
Status: "running", Status: "active",
ActiveWorkerCount: len(workers1), ActiveWorkerCount: len(workers1),
} }
@ -1690,7 +1690,7 @@ func TestClearServerState(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().Add(-15 * time.Minute), Started: time.Now().Add(-15 * time.Minute),
Status: "running", Status: "active",
ActiveWorkerCount: len(workers2), ActiveWorkerCount: len(workers2),
} }

View File

@ -123,8 +123,8 @@ func (p *processor) stop() {
}) })
} }
// NOTE: once terminated, processor cannot be re-started. // NOTE: once shutdown, processor cannot be re-started.
func (p *processor) terminate() { func (p *processor) shutdown() {
p.stop() p.stop()
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) }) time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })

View File

@ -113,7 +113,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
p.terminate() p.shutdown()
t.Fatal(err) t.Fatal(err)
} }
} }
@ -121,7 +121,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
} }
p.terminate() p.shutdown()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
@ -213,7 +213,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
} }
} }
p.terminate() p.shutdown()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
@ -290,7 +290,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
} }
p.terminate() p.shutdown()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
@ -418,12 +418,12 @@ func TestProcessorRetry(t *testing.T) {
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
p.terminate() p.shutdown()
t.Fatal(err) t.Fatal(err)
} }
} }
time.Sleep(tc.wait) // FIXME: This makes test flaky. time.Sleep(tc.wait) // FIXME: This makes test flaky.
p.terminate() p.shutdown()
cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName) gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
@ -594,7 +594,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
} }
} }
p.terminate() p.shutdown()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)

View File

@ -47,7 +47,7 @@ func newRecoverer(params recovererParams) *recoverer {
} }
} }
func (r *recoverer) terminate() { func (r *recoverer) shutdown() {
r.logger.Debug("Recoverer shutting down...") r.logger.Debug("Recoverer shutting down...")
// Signal the recoverer goroutine to stop polling. // Signal the recoverer goroutine to stop polling.
r.done <- struct{}{} r.done <- struct{}{}

View File

@ -239,7 +239,7 @@ func TestRecoverer(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
recoverer.start(&wg) recoverer.start(&wg)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
recoverer.terminate() recoverer.shutdown()
for qname, want := range tc.wantActive { for qname, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r, qname) gotActive := h.GetActiveMessages(t, r, qname)

View File

@ -21,7 +21,7 @@ import (
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule. // A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
type Scheduler struct { type Scheduler struct {
id string id string
status *base.ServerStatus state *base.ServerState
logger *log.Logger logger *log.Logger
client *Client client *Client
rdb *rdb.RDB rdb *rdb.RDB
@ -61,7 +61,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
return &Scheduler{ return &Scheduler{
id: generateSchedulerID(), id: generateSchedulerID(),
status: base.NewServerStatus(base.StatusIdle), state: base.NewServerState(),
logger: logger, logger: logger,
client: NewClient(r), client: NewClient(r),
rdb: rdb.NewRDB(c), rdb: rdb.NewRDB(c),
@ -170,22 +170,23 @@ func (s *Scheduler) Unregister(entryID string) error {
} }
// Run starts the scheduler until an os signal to exit the program is received. // Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped. // It returns an error if scheduler is already running or has been shutdown.
func (s *Scheduler) Run() error { func (s *Scheduler) Run() error {
if err := s.Start(); err != nil { if err := s.Start(); err != nil {
return err return err
} }
s.waitForSignals() s.waitForSignals()
return s.Stop() s.Shutdown()
return nil
} }
// Start starts the scheduler. // Start starts the scheduler.
// It returns an error if the scheduler is already running or has been stopped. // It returns an error if the scheduler is already running or has been shutdown.
func (s *Scheduler) Start() error { func (s *Scheduler) Start() error {
switch s.status.Get() { switch s.state.Get() {
case base.StatusRunning: case base.StateActive:
return fmt.Errorf("asynq: the scheduler is already running") return fmt.Errorf("asynq: the scheduler is already running")
case base.StatusStopped: case base.StateClosed:
return fmt.Errorf("asynq: the scheduler has already been stopped") return fmt.Errorf("asynq: the scheduler has already been stopped")
} }
s.logger.Info("Scheduler starting") s.logger.Info("Scheduler starting")
@ -193,16 +194,12 @@ func (s *Scheduler) Start() error {
s.cron.Start() s.cron.Start()
s.wg.Add(1) s.wg.Add(1)
go s.runHeartbeater() go s.runHeartbeater()
s.status.Set(base.StatusRunning) s.state.Set(base.StateActive)
return nil return nil
} }
// Stop stops the scheduler. // Shutdown stops and shuts down the scheduler.
// It returns an error if the scheduler is not currently running. func (s *Scheduler) Shutdown() {
func (s *Scheduler) Stop() error {
if s.status.Get() != base.StatusRunning {
return fmt.Errorf("asynq: the scheduler is not running")
}
s.logger.Info("Scheduler shutting down") s.logger.Info("Scheduler shutting down")
close(s.done) // signal heartbeater to stop close(s.done) // signal heartbeater to stop
ctx := s.cron.Stop() ctx := s.cron.Stop()
@ -212,9 +209,8 @@ func (s *Scheduler) Stop() error {
s.clearHistory() s.clearHistory()
s.client.Close() s.client.Close()
s.rdb.Close() s.rdb.Close()
s.status.Set(base.StatusStopped) s.state.Set(base.StateClosed)
s.logger.Info("Scheduler stopped") s.logger.Info("Scheduler stopped")
return nil
} }
func (s *Scheduler) runHeartbeater() { func (s *Scheduler) runHeartbeater() {

View File

@ -67,9 +67,7 @@ func TestSchedulerRegister(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil { scheduler.Shutdown()
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue) got := asynqtest.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" { if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" {
@ -106,9 +104,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
} }
// Scheduler should attempt to enqueue the task three times (every 3s). // Scheduler should attempt to enqueue the task three times (every 3s).
time.Sleep(10 * time.Second) time.Sleep(10 * time.Second)
if err := scheduler.Stop(); err != nil { scheduler.Shutdown()
t.Fatal(err)
}
mu.Lock() mu.Lock()
if counter != 3 { if counter != 3 {
@ -150,9 +146,7 @@ func TestSchedulerUnregister(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil { scheduler.Shutdown()
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue) got := asynqtest.GetPendingMessages(t, r, tc.queue)
if len(got) != 0 { if len(got) != 0 {

101
server.go
View File

@ -21,23 +21,24 @@ import (
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
// Server is responsible for managing the task processing. // Server is responsible for task processing and task lifecycle management.
// //
// Server pulls tasks off queues and processes them. // Server pulls tasks off queues and processes them.
// If the processing of a task is unsuccessful, server will schedule it for a retry. // If the processing of a task is unsuccessful, server will schedule it for a retry.
//
// A task will be retried until either the task gets processed successfully // A task will be retried until either the task gets processed successfully
// or until it reaches its max retry count. // or until it reaches its max retry count.
// //
// If a task exhausts its retries, it will be moved to the archive and // If a task exhausts its retries, it will be moved to the archive and
// will be kept in the archive for some time until a certain condition is met // will be kept in the archive set.
// (e.g., archive size reaches a certain limit, or the task has been in the // Note that the archive size is finite and once it reaches its max size,
// archive for a certain amount of time). // oldest tasks in the archive will be deleted.
type Server struct { type Server struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
status *base.ServerStatus state *base.ServerState
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup
@ -278,7 +279,7 @@ const (
) )
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
// and background processing configuration. // and server configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server { func NewServer(r RedisConnOpt, cfg Config) *Server {
c, ok := r.MakeRedisClient().(redis.UniversalClient) c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
@ -324,7 +325,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
starting := make(chan *workerInfo) starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage) finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest) syncCh := make(chan *syncRequest)
status := base.NewServerStatus(base.StatusIdle) state := base.NewServerState()
cancels := base.NewCancelations() cancels := base.NewCancelations()
syncer := newSyncer(syncerParams{ syncer := newSyncer(syncerParams{
@ -339,7 +340,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
concurrency: n, concurrency: n,
queues: queues, queues: queues,
strictPriority: cfg.StrictPriority, strictPriority: cfg.StrictPriority,
status: status, state: state,
starting: starting, starting: starting,
finished: finished, finished: finished,
}) })
@ -384,7 +385,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
return &Server{ return &Server{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
status: status, state: state,
forwarder: forwarder, forwarder: forwarder,
processor: processor, processor: processor,
syncer: syncer, syncer: syncer,
@ -400,11 +401,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
// ProcessTask should return nil if the processing of a task // ProcessTask should return nil if the processing of a task
// is successful. // is successful.
// //
// If ProcessTask return a non-nil error or panics, the task // If ProcessTask returns a non-nil error or panics, the task
// will be retried after delay. // will be retried after delay if retry-count is remaining,
// One exception to this rule is when ProcessTask returns SkipRetry error. // otherwise the task will be archived.
// If the returned error is SkipRetry or the error wraps SkipRetry, retry is //
// skipped and task will be archived instead. // One exception to this rule is when ProcessTask returns a SkipRetry error.
// If the returned error is SkipRetry or an error wraps SkipRetry, retry is
// skipped and the task will be immediately archived instead.
type Handler interface { type Handler interface {
ProcessTask(context.Context, *Task) error ProcessTask(context.Context, *Task) error
} }
@ -420,43 +423,46 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
return fn(ctx, task) return fn(ctx, task)
} }
// ErrServerStopped indicates that the operation is now illegal because of the server being stopped. // ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.
var ErrServerStopped = errors.New("asynq: the server has been stopped") var ErrServerClosed = errors.New("asynq: Server closed")
// Run starts the background-task processing and blocks until // Run starts the task processing and blocks until
// an os signal to exit the program is received. Once it receives // an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all active workers and other // a signal, it gracefully shuts down all active workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
// //
// Run returns any error encountered during server startup time. // Run returns any error encountered at server startup time.
// If the server has already been stopped, ErrServerStopped is returned. // If the server has already been shutdown, ErrServerClosed is returned.
func (srv *Server) Run(handler Handler) error { func (srv *Server) Run(handler Handler) error {
if err := srv.Start(handler); err != nil { if err := srv.Start(handler); err != nil {
return err return err
} }
srv.waitForSignals() srv.waitForSignals()
srv.Stop() srv.Shutdown()
return nil return nil
} }
// Start starts the worker server. Once the server has started, // Start starts the worker server. Once the server has started,
// it pulls tasks off queues and starts a worker goroutine for each task. // it pulls tasks off queues and starts a worker goroutine for each task
// and then call Handler to process it.
// Tasks are processed concurrently by the workers up to the number of // Tasks are processed concurrently by the workers up to the number of
// concurrency specified at the initialization time. // concurrency specified in Config.Concurrency.
// //
// Start returns any error encountered during server startup time. // Start returns any error encountered at server startup time.
// If the server has already been stopped, ErrServerStopped is returned. // If the server has already been shutdown, ErrServerClosed is returned.
func (srv *Server) Start(handler Handler) error { func (srv *Server) Start(handler Handler) error {
if handler == nil { if handler == nil {
return fmt.Errorf("asynq: server cannot run with nil handler") return fmt.Errorf("asynq: server cannot run with nil handler")
} }
switch srv.status.Get() { switch srv.state.Get() {
case base.StatusRunning: case base.StateActive:
return fmt.Errorf("asynq: the server is already running") return fmt.Errorf("asynq: the server is already running")
case base.StatusStopped: case base.StateStopped:
return ErrServerStopped return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.")
case base.StateClosed:
return ErrServerClosed
} }
srv.status.Set(base.StatusRunning) srv.state.Set(base.StateActive)
srv.processor.handler = handler srv.processor.handler = handler
srv.logger.Info("Starting processing") srv.logger.Info("Starting processing")
@ -471,43 +477,46 @@ func (srv *Server) Start(handler Handler) error {
return nil return nil
} }
// Stop stops the worker server. // Shutdown gracefully shuts down the server.
// It gracefully closes all active workers. The server will wait for // It gracefully closes all active workers. The server will wait for
// active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.
// If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
func (srv *Server) Stop() { func (srv *Server) Shutdown() {
switch srv.status.Get() { switch srv.state.Get() {
case base.StatusIdle, base.StatusStopped: case base.StateNew, base.StateClosed:
// server is not running, do nothing and return. // server is not running, do nothing and return.
return return
} }
srv.logger.Info("Starting graceful shutdown") srv.logger.Info("Starting graceful shutdown")
// Note: The order of termination is important. // Note: The order of shutdown is important.
// Sender goroutines should be terminated before the receiver goroutines. // Sender goroutines should be terminated before the receiver goroutines.
// processor -> syncer (via syncCh) // processor -> syncer (via syncCh)
// processor -> heartbeater (via starting, finished channels) // processor -> heartbeater (via starting, finished channels)
srv.forwarder.terminate() srv.forwarder.shutdown()
srv.processor.terminate() srv.processor.shutdown()
srv.recoverer.terminate() srv.recoverer.shutdown()
srv.syncer.terminate() srv.syncer.shutdown()
srv.subscriber.terminate() srv.subscriber.shutdown()
srv.healthchecker.terminate() srv.healthchecker.shutdown()
srv.heartbeater.terminate() srv.heartbeater.shutdown()
srv.wg.Wait() srv.wg.Wait()
srv.broker.Close() srv.broker.Close()
srv.status.Set(base.StatusStopped) srv.state.Set(base.StateClosed)
srv.logger.Info("Exiting") srv.logger.Info("Exiting")
} }
// Quiet signals the server to stop pulling new tasks off queues. // Stop signals the server to stop pulling new tasks off queues.
// Quiet should be used before stopping the server. // Stop can be used before shutting down the server to ensure that all
func (srv *Server) Quiet() { // currently active tasks are processed before server shutdown.
//
// Stop does not shutdown the server, make sure to call Shutdown before exit.
func (srv *Server) Stop() {
srv.logger.Info("Stopping processor") srv.logger.Info("Stopping processor")
srv.processor.stop() srv.processor.stop()
srv.status.Set(base.StatusQuiet) srv.state.Set(base.StateStopped)
srv.logger.Info("Processor stopped") srv.logger.Info("Processor stopped")
} }

View File

@ -50,7 +50,7 @@ func TestServer(t *testing.T) {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
srv.Stop() srv.Shutdown()
} }
func TestServerRun(t *testing.T) { func TestServerRun(t *testing.T) {
@ -82,16 +82,16 @@ func TestServerRun(t *testing.T) {
} }
} }
func TestServerErrServerStopped(t *testing.T) { func TestServerErrServerClosed(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
handler := NewServeMux() handler := NewServeMux()
if err := srv.Start(handler); err != nil { if err := srv.Start(handler); err != nil {
t.Fatal(err) t.Fatal(err)
} }
srv.Stop() srv.Shutdown()
err := srv.Start(handler) err := srv.Start(handler)
if err != ErrServerStopped { if err != ErrServerClosed {
t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerStopped error", err) t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerClosed error", err)
} }
} }
@ -100,7 +100,7 @@ func TestServerErrNilHandler(t *testing.T) {
err := srv.Start(nil) err := srv.Start(nil)
if err == nil { if err == nil {
t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error") t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error")
srv.Stop() srv.Shutdown()
} }
} }
@ -114,7 +114,7 @@ func TestServerErrServerRunning(t *testing.T) {
if err == nil { if err == nil {
t.Error("Calling (*Server).Start(handler) on already running server did not return error") t.Error("Calling (*Server).Start(handler) on already running server did not return error")
} }
srv.Stop() srv.Shutdown()
} }
func TestServerWithRedisDown(t *testing.T) { func TestServerWithRedisDown(t *testing.T) {
@ -146,7 +146,7 @@ func TestServerWithRedisDown(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
srv.Stop() srv.Shutdown()
} }
func TestServerWithFlakyBroker(t *testing.T) { func TestServerWithFlakyBroker(t *testing.T) {
@ -207,7 +207,7 @@ func TestServerWithFlakyBroker(t *testing.T) {
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
srv.Stop() srv.Shutdown()
} }
func TestLogLevel(t *testing.T) { func TestLogLevel(t *testing.T) {

View File

@ -22,7 +22,7 @@ func (srv *Server) waitForSignals() {
for { for {
sig := <-sigs sig := <-sigs
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
srv.Quiet() srv.Stop()
continue continue
} }
break break

View File

@ -43,7 +43,7 @@ func newSubscriber(params subscriberParams) *subscriber {
} }
} }
func (s *subscriber) terminate() { func (s *subscriber) shutdown() {
s.logger.Debug("Subscriber shutting down...") s.logger.Debug("Subscriber shutting down...")
// Signal the subscriber goroutine to stop. // Signal the subscriber goroutine to stop.
s.done <- struct{}{} s.done <- struct{}{}

View File

@ -46,7 +46,7 @@ func TestSubscriber(t *testing.T) {
}) })
var wg sync.WaitGroup var wg sync.WaitGroup
subscriber.start(&wg) subscriber.start(&wg)
defer subscriber.terminate() defer subscriber.shutdown()
// wait for subscriber to establish connection to pubsub channel // wait for subscriber to establish connection to pubsub channel
time.Sleep(time.Second) time.Sleep(time.Second)
@ -91,7 +91,7 @@ func TestSubscriberWithRedisDown(t *testing.T) {
testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis.
var wg sync.WaitGroup var wg sync.WaitGroup
subscriber.start(&wg) subscriber.start(&wg)
defer subscriber.terminate() defer subscriber.shutdown()
time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis. time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis.

View File

@ -46,7 +46,7 @@ func newSyncer(params syncerParams) *syncer {
} }
} }
func (s *syncer) terminate() { func (s *syncer) shutdown() {
s.logger.Debug("Syncer shutting down...") s.logger.Debug("Syncer shutting down...")
// Signal the syncer goroutine to stop. // Signal the syncer goroutine to stop.
s.done <- struct{}{} s.done <- struct{}{}

View File

@ -35,7 +35,7 @@ func TestSyncer(t *testing.T) {
}) })
var wg sync.WaitGroup var wg sync.WaitGroup
syncer.start(&wg) syncer.start(&wg)
defer syncer.terminate() defer syncer.shutdown()
for _, msg := range inProgress { for _, msg := range inProgress {
m := msg m := msg
@ -66,7 +66,7 @@ func TestSyncerRetry(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
syncer.start(&wg) syncer.start(&wg)
defer syncer.terminate() defer syncer.shutdown()
var ( var (
mu sync.Mutex mu sync.Mutex
@ -131,7 +131,7 @@ func TestSyncerDropsStaleRequests(t *testing.T) {
} }
time.Sleep(2 * interval) // ensure that syncer runs at least once time.Sleep(2 * interval) // ensure that syncer runs at least once
syncer.terminate() syncer.shutdown()
mu.Lock() mu.Lock()
if n != 0 { if n != 0 {

View File

@ -35,11 +35,11 @@ The command shows the following for each server:
* Host and PID of the process in which the server is running * Host and PID of the process in which the server is running
* Number of active workers out of worker pool * Number of active workers out of worker pool
* Queue configuration * Queue configuration
* State of the worker server ("running" | "quiet") * State of the worker server ("active" | "stopped")
* Time the server was started * Time the server was started
A "running" server is pulling tasks from queues and processing them. A "active" server is pulling tasks from queues and processing them.
A "quiet" server is no longer pulling new tasks from queues`, A "stopped" server is no longer pulling new tasks from queues`,
Run: serverList, Run: serverList,
} }