mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 08:40:22 +08:00
Rename ServerStatus to ServerState internally
This commit is contained in:
parent
5af86a0903
commit
105084d6fb
10
heartbeat.go
10
heartbeat.go
@ -40,8 +40,8 @@ type heartbeater struct {
|
||||
started time.Time
|
||||
workers map[string]*workerInfo
|
||||
|
||||
// status is shared with other goroutine but is concurrency safe.
|
||||
status *base.ServerStatus
|
||||
// state is shared with other goroutine but is concurrency safe.
|
||||
state *base.ServerState
|
||||
|
||||
// channels to receive updates on active workers.
|
||||
starting <-chan *workerInfo
|
||||
@ -55,7 +55,7 @@ type heartbeaterParams struct {
|
||||
concurrency int
|
||||
queues map[string]int
|
||||
strictPriority bool
|
||||
status *base.ServerStatus
|
||||
state *base.ServerState
|
||||
starting <-chan *workerInfo
|
||||
finished <-chan *base.TaskMessage
|
||||
}
|
||||
@ -79,7 +79,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
|
||||
queues: params.queues,
|
||||
strictPriority: params.strictPriority,
|
||||
|
||||
status: params.status,
|
||||
state: params.state,
|
||||
workers: make(map[string]*workerInfo),
|
||||
starting: params.starting,
|
||||
finished: params.finished,
|
||||
@ -142,7 +142,7 @@ func (h *heartbeater) beat() {
|
||||
Concurrency: h.concurrency,
|
||||
Queues: h.queues,
|
||||
StrictPriority: h.strictPriority,
|
||||
Status: h.status.String(),
|
||||
Status: h.state.String(),
|
||||
Started: h.started,
|
||||
ActiveWorkerCount: len(h.workers),
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ func TestHeartbeater(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
|
||||
status := base.NewServerStatus(base.StatusIdle)
|
||||
state := base.NewServerState()
|
||||
hb := newHeartbeater(heartbeaterParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
@ -46,7 +46,7 @@ func TestHeartbeater(t *testing.T) {
|
||||
concurrency: tc.concurrency,
|
||||
queues: tc.queues,
|
||||
strictPriority: false,
|
||||
status: status,
|
||||
state: state,
|
||||
starting: make(chan *workerInfo),
|
||||
finished: make(chan *base.TaskMessage),
|
||||
})
|
||||
@ -55,7 +55,7 @@ func TestHeartbeater(t *testing.T) {
|
||||
hb.host = tc.host
|
||||
hb.pid = tc.pid
|
||||
|
||||
status.Set(base.StatusActive)
|
||||
state.Set(base.StateActive)
|
||||
var wg sync.WaitGroup
|
||||
hb.start(&wg)
|
||||
|
||||
@ -65,7 +65,7 @@ func TestHeartbeater(t *testing.T) {
|
||||
Queues: tc.queues,
|
||||
Concurrency: tc.concurrency,
|
||||
Started: time.Now(),
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
}
|
||||
|
||||
// allow for heartbeater to write to redis
|
||||
@ -91,12 +91,12 @@ func TestHeartbeater(t *testing.T) {
|
||||
}
|
||||
|
||||
// status change
|
||||
status.Set(base.StatusClosed)
|
||||
state.Set(base.StateClosed)
|
||||
|
||||
// allow for heartbeater to write to redis
|
||||
time.Sleep(tc.interval * 2)
|
||||
|
||||
want.Status = "stopped"
|
||||
want.Status = "closed"
|
||||
ss, err = rdbClient.ListServers()
|
||||
if err != nil {
|
||||
t.Errorf("could not read process status from redis: %v", err)
|
||||
@ -131,6 +131,8 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
|
||||
r := rdb.NewRDB(setup(t))
|
||||
defer r.Close()
|
||||
testBroker := testbroker.NewTestBroker(r)
|
||||
state := base.NewServerState()
|
||||
state.Set(base.StateActive)
|
||||
hb := newHeartbeater(heartbeaterParams{
|
||||
logger: testLogger,
|
||||
broker: testBroker,
|
||||
@ -138,7 +140,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
|
||||
concurrency: 10,
|
||||
queues: map[string]int{"default": 1},
|
||||
strictPriority: false,
|
||||
status: base.NewServerStatus(base.StatusActive),
|
||||
state: state,
|
||||
starting: make(chan *workerInfo),
|
||||
finished: make(chan *base.TaskMessage),
|
||||
})
|
||||
|
@ -215,52 +215,55 @@ type Z struct {
|
||||
Score int64
|
||||
}
|
||||
|
||||
// ServerStatus represents status of a server.
|
||||
// ServerStatus methods are concurrency safe.
|
||||
type ServerStatus struct {
|
||||
// ServerState represents state of a server.
|
||||
// ServerState methods are concurrency safe.
|
||||
type ServerState struct {
|
||||
mu sync.Mutex
|
||||
val ServerStatusValue
|
||||
val ServerStateValue
|
||||
}
|
||||
|
||||
// NewServerStatus returns a new status instance given an initial value.
|
||||
func NewServerStatus(v ServerStatusValue) *ServerStatus {
|
||||
return &ServerStatus{val: v}
|
||||
// NewServerState returns a new state instance.
|
||||
// Initial state is set to StateNew.
|
||||
func NewServerState() *ServerState {
|
||||
return &ServerState{val: StateNew}
|
||||
}
|
||||
|
||||
type ServerStatusValue int
|
||||
type ServerStateValue int
|
||||
|
||||
const (
|
||||
// StatusIdle indicates the server is in idle state.
|
||||
StatusIdle ServerStatusValue = iota
|
||||
// StateNew represents a new server. Server begins in
|
||||
// this state and then transition to StatusActive when
|
||||
// Start or Run is callled.
|
||||
StateNew ServerStateValue = iota
|
||||
|
||||
// StatusActive indicates the server is up and active.
|
||||
StatusActive
|
||||
// StateActive indicates the server is up and active.
|
||||
StateActive
|
||||
|
||||
// StatusStopped indicates the server is up but no longer processing new tasks.
|
||||
StatusStopped
|
||||
// StateStopped indicates the server is up but no longer processing new tasks.
|
||||
StateStopped
|
||||
|
||||
// StatusClosed indicates the server has been shutdown.
|
||||
StatusClosed
|
||||
// StateClosed indicates the server has been shutdown.
|
||||
StateClosed
|
||||
)
|
||||
|
||||
var statuses = []string{
|
||||
"idle",
|
||||
var serverStates = []string{
|
||||
"new",
|
||||
"active",
|
||||
"stopped",
|
||||
"closed",
|
||||
}
|
||||
|
||||
func (s *ServerStatus) String() string {
|
||||
func (s *ServerState) String() string {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if StatusIdle <= s.val && s.val <= StatusClosed {
|
||||
return statuses[s.val]
|
||||
if StateNew <= s.val && s.val <= StateClosed {
|
||||
return serverStates[s.val]
|
||||
}
|
||||
return "unknown status"
|
||||
}
|
||||
|
||||
// Get returns the status value.
|
||||
func (s *ServerStatus) Get() ServerStatusValue {
|
||||
func (s *ServerState) Get() ServerStateValue {
|
||||
s.mu.Lock()
|
||||
v := s.val
|
||||
s.mu.Unlock()
|
||||
@ -268,7 +271,7 @@ func (s *ServerStatus) Get() ServerStatusValue {
|
||||
}
|
||||
|
||||
// Set sets the status value.
|
||||
func (s *ServerStatus) Set(v ServerStatusValue) {
|
||||
func (s *ServerState) Set(v ServerStateValue) {
|
||||
s.mu.Lock()
|
||||
s.val = v
|
||||
s.mu.Unlock()
|
||||
|
@ -400,7 +400,7 @@ func TestServerInfoEncoding(t *testing.T) {
|
||||
Concurrency: 10,
|
||||
Queues: map[string]int{"default": 1, "critical": 2},
|
||||
StrictPriority: false,
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
Started: time.Now().Add(-3 * time.Hour),
|
||||
ActiveWorkerCount: 8,
|
||||
},
|
||||
@ -530,7 +530,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) {
|
||||
// Test for status being accessed by multiple goroutines.
|
||||
// Run with -race flag to check for data race.
|
||||
func TestStatusConcurrentAccess(t *testing.T) {
|
||||
status := NewServerStatus(StatusIdle)
|
||||
status := NewServerState()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@ -544,7 +544,7 @@ func TestStatusConcurrentAccess(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
status.Set(StatusClosed)
|
||||
status.Set(StateClosed)
|
||||
_ = status.String()
|
||||
}()
|
||||
|
||||
|
@ -3222,7 +3222,7 @@ func TestListServers(t *testing.T) {
|
||||
ServerID: "server123",
|
||||
Concurrency: 10,
|
||||
Queues: map[string]int{"default": 1},
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
Started: started1,
|
||||
ActiveWorkerCount: 0,
|
||||
}
|
||||
|
@ -1475,7 +1475,7 @@ func TestWriteServerState(t *testing.T) {
|
||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||
StrictPriority: false,
|
||||
Started: time.Now().UTC(),
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
ActiveWorkerCount: 0,
|
||||
}
|
||||
|
||||
@ -1565,7 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
|
||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||
StrictPriority: false,
|
||||
Started: time.Now().Add(-10 * time.Minute).UTC(),
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
ActiveWorkerCount: len(workers),
|
||||
}
|
||||
|
||||
@ -1667,7 +1667,7 @@ func TestClearServerState(t *testing.T) {
|
||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||
StrictPriority: false,
|
||||
Started: time.Now().Add(-10 * time.Minute),
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
ActiveWorkerCount: len(workers1),
|
||||
}
|
||||
|
||||
@ -1690,7 +1690,7 @@ func TestClearServerState(t *testing.T) {
|
||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||
StrictPriority: false,
|
||||
Started: time.Now().Add(-15 * time.Minute),
|
||||
Status: "running",
|
||||
Status: "active",
|
||||
ActiveWorkerCount: len(workers2),
|
||||
}
|
||||
|
||||
|
16
scheduler.go
16
scheduler.go
@ -21,7 +21,7 @@ import (
|
||||
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
|
||||
type Scheduler struct {
|
||||
id string
|
||||
status *base.ServerStatus
|
||||
state *base.ServerState
|
||||
logger *log.Logger
|
||||
client *Client
|
||||
rdb *rdb.RDB
|
||||
@ -61,7 +61,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
|
||||
return &Scheduler{
|
||||
id: generateSchedulerID(),
|
||||
status: base.NewServerStatus(base.StatusIdle),
|
||||
state: base.NewServerState(),
|
||||
logger: logger,
|
||||
client: NewClient(r),
|
||||
rdb: rdb.NewRDB(c),
|
||||
@ -182,10 +182,10 @@ func (s *Scheduler) Run() error {
|
||||
// Start starts the scheduler.
|
||||
// It returns an error if the scheduler is already running or has been stopped.
|
||||
func (s *Scheduler) Start() error {
|
||||
switch s.status.Get() {
|
||||
case base.StatusActive:
|
||||
switch s.state.Get() {
|
||||
case base.StateActive:
|
||||
return fmt.Errorf("asynq: the scheduler is already running")
|
||||
case base.StatusClosed:
|
||||
case base.StateClosed:
|
||||
return fmt.Errorf("asynq: the scheduler has already been stopped")
|
||||
}
|
||||
s.logger.Info("Scheduler starting")
|
||||
@ -193,14 +193,14 @@ func (s *Scheduler) Start() error {
|
||||
s.cron.Start()
|
||||
s.wg.Add(1)
|
||||
go s.runHeartbeater()
|
||||
s.status.Set(base.StatusActive)
|
||||
s.state.Set(base.StateActive)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the scheduler.
|
||||
// It returns an error if the scheduler is not currently running.
|
||||
func (s *Scheduler) Stop() error {
|
||||
if s.status.Get() != base.StatusActive {
|
||||
if s.state.Get() != base.StateActive {
|
||||
return fmt.Errorf("asynq: the scheduler is not running")
|
||||
}
|
||||
s.logger.Info("Scheduler shutting down")
|
||||
@ -212,7 +212,7 @@ func (s *Scheduler) Stop() error {
|
||||
s.clearHistory()
|
||||
s.client.Close()
|
||||
s.rdb.Close()
|
||||
s.status.Set(base.StatusClosed)
|
||||
s.state.Set(base.StateClosed)
|
||||
s.logger.Info("Scheduler stopped")
|
||||
return nil
|
||||
}
|
||||
|
42
server.go
42
server.go
@ -37,7 +37,7 @@ type Server struct {
|
||||
|
||||
broker base.Broker
|
||||
|
||||
status *base.ServerStatus
|
||||
state *base.ServerState
|
||||
|
||||
// wait group to wait for all goroutines to finish.
|
||||
wg sync.WaitGroup
|
||||
@ -278,7 +278,7 @@ const (
|
||||
)
|
||||
|
||||
// NewServer returns a new Server given a redis connection option
|
||||
// and background processing configuration.
|
||||
// and server configuration.
|
||||
func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
c, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||
if !ok {
|
||||
@ -324,7 +324,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
syncCh := make(chan *syncRequest)
|
||||
status := base.NewServerStatus(base.StatusIdle)
|
||||
state := base.NewServerState()
|
||||
cancels := base.NewCancelations()
|
||||
|
||||
syncer := newSyncer(syncerParams{
|
||||
@ -339,7 +339,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
concurrency: n,
|
||||
queues: queues,
|
||||
strictPriority: cfg.StrictPriority,
|
||||
status: status,
|
||||
state: state,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
@ -384,7 +384,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
return &Server{
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
status: status,
|
||||
state: state,
|
||||
forwarder: forwarder,
|
||||
processor: processor,
|
||||
syncer: syncer,
|
||||
@ -401,10 +401,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
// is successful.
|
||||
//
|
||||
// If ProcessTask return a non-nil error or panics, the task
|
||||
// will be retried after delay.
|
||||
// will be retried after delay if retry-count is remaining,
|
||||
// otherwise the task will be archived.
|
||||
//
|
||||
// One exception to this rule is when ProcessTask returns SkipRetry error.
|
||||
// If the returned error is SkipRetry or the error wraps SkipRetry, retry is
|
||||
// skipped and task will be archived instead.
|
||||
// skipped and task will be immediately archived instead.
|
||||
type Handler interface {
|
||||
ProcessTask(context.Context, *Task) error
|
||||
}
|
||||
@ -421,21 +423,21 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
|
||||
}
|
||||
|
||||
// ErrServerClosed indicates that the operation is now illegal because of the server has been shutdown.
|
||||
var ErrServerClosed = errors.New("asynq: server closed")
|
||||
var ErrServerClosed = errors.New("asynq: Server closed")
|
||||
|
||||
// Run starts the background-task processing and blocks until
|
||||
// Run starts the task processing and blocks until
|
||||
// an os signal to exit the program is received. Once it receives
|
||||
// a signal, it gracefully shuts down all active workers and other
|
||||
// goroutines to process the tasks.
|
||||
//
|
||||
// Run returns any error encountered during server startup time.
|
||||
// Run returns any error encountered at server startup time.
|
||||
// If the server has already been shutdown, ErrServerClosed is returned.
|
||||
func (srv *Server) Run(handler Handler) error {
|
||||
if err := srv.Start(handler); err != nil {
|
||||
return err
|
||||
}
|
||||
srv.waitForSignals()
|
||||
srv.Stop()
|
||||
srv.Shutdown()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -445,18 +447,18 @@ func (srv *Server) Run(handler Handler) error {
|
||||
// concurrency specified at the initialization time.
|
||||
//
|
||||
// Start returns any error encountered during server startup time.
|
||||
// If the server has already been stopped, ErrServerClosed is returned.
|
||||
// If the server has already been shutdown, ErrServerClosed is returned.
|
||||
func (srv *Server) Start(handler Handler) error {
|
||||
if handler == nil {
|
||||
return fmt.Errorf("asynq: server cannot run with nil handler")
|
||||
}
|
||||
switch srv.status.Get() {
|
||||
case base.StatusActive:
|
||||
switch srv.state.Get() {
|
||||
case base.StateActive:
|
||||
return fmt.Errorf("asynq: the server is already running")
|
||||
case base.StatusClosed:
|
||||
case base.StateClosed:
|
||||
return ErrServerClosed
|
||||
}
|
||||
srv.status.Set(base.StatusActive)
|
||||
srv.state.Set(base.StateActive)
|
||||
srv.processor.handler = handler
|
||||
|
||||
srv.logger.Info("Starting processing")
|
||||
@ -476,8 +478,8 @@ func (srv *Server) Start(handler Handler) error {
|
||||
// active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.
|
||||
// If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
|
||||
func (srv *Server) Shutdown() {
|
||||
switch srv.status.Get() {
|
||||
case base.StatusIdle, base.StatusClosed:
|
||||
switch srv.state.Get() {
|
||||
case base.StateNew, base.StateClosed:
|
||||
// server is not running, do nothing and return.
|
||||
return
|
||||
}
|
||||
@ -498,7 +500,7 @@ func (srv *Server) Shutdown() {
|
||||
srv.wg.Wait()
|
||||
|
||||
srv.broker.Close()
|
||||
srv.status.Set(base.StatusClosed)
|
||||
srv.state.Set(base.StateClosed)
|
||||
|
||||
srv.logger.Info("Exiting")
|
||||
}
|
||||
@ -508,6 +510,6 @@ func (srv *Server) Shutdown() {
|
||||
func (srv *Server) Stop() {
|
||||
srv.logger.Info("Stopping processor")
|
||||
srv.processor.stop()
|
||||
srv.status.Set(base.StatusStopped)
|
||||
srv.state.Set(base.StateStopped)
|
||||
srv.logger.Info("Processor stopped")
|
||||
}
|
||||
|
@ -35,11 +35,11 @@ The command shows the following for each server:
|
||||
* Host and PID of the process in which the server is running
|
||||
* Number of active workers out of worker pool
|
||||
* Queue configuration
|
||||
* State of the worker server ("running" | "quiet")
|
||||
* State of the worker server ("active" | "stopped")
|
||||
* Time the server was started
|
||||
|
||||
A "running" server is pulling tasks from queues and processing them.
|
||||
A "quiet" server is no longer pulling new tasks from queues`,
|
||||
A "active" server is pulling tasks from queues and processing them.
|
||||
A "stopped" server is no longer pulling new tasks from queues`,
|
||||
Run: serverList,
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user