diff --git a/heartbeat.go b/heartbeat.go index dff5175..3164436 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -18,7 +18,7 @@ type heartbeater struct { logger Logger rdb *rdb.RDB - ps *base.ProcessState + ss *base.ServerState // channel to communicate back to the long running "heartbeater" goroutine. done chan struct{} @@ -27,11 +27,11 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { +func newHeartbeater(l Logger, rdb *rdb.RDB, ss *base.ServerState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, rdb: rdb, - ps: ps, + ss: ss, done: make(chan struct{}), interval: interval, } @@ -44,8 +44,8 @@ func (h *heartbeater) terminate() { } func (h *heartbeater) start(wg *sync.WaitGroup) { - h.ps.SetStarted(time.Now()) - h.ps.SetStatus(base.StatusRunning) + h.ss.SetStarted(time.Now()) + h.ss.SetStatus(base.StatusRunning) wg.Add(1) go func() { defer wg.Done() @@ -53,7 +53,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { for { select { case <-h.done: - h.rdb.ClearProcessState(h.ps) + h.rdb.ClearServerState(h.ss) h.logger.Info("Heartbeater done") return case <-time.After(h.interval): @@ -66,7 +66,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { func (h *heartbeater) beat() { // Note: Set TTL to be long enough so that it won't expire before we write again // and short enough to expire quickly once the process is shut down or killed. - err := h.rdb.WriteProcessState(h.ps, h.interval*2) + err := h.rdb.WriteServerState(h.ss, h.interval*2) if err != nil { h.logger.Error("could not write heartbeat data: %v", err) } diff --git a/internal/base/base.go b/internal/base/base.go index 7c4ac00..abf0fae 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -104,42 +104,46 @@ type TaskMessage struct { UniqueKey string } -// ProcessState holds process level information. +// ServerState holds process level information. // -// ProcessStates are safe for concurrent use by multiple goroutines. -type ProcessState struct { +// ServerStates are safe for concurrent use by multiple goroutines. +type ServerState struct { mu sync.Mutex // guards all data fields concurrency int queues map[string]int strictPriority bool pid int host string - status PStatus + status ServerStatus started time.Time workers map[string]*workerStats } -// PStatus represents status of a process. -type PStatus int +// ServerStatus represents status of a server. +type ServerStatus int const ( - // StatusIdle indicates process is in idle state. - StatusIdle PStatus = iota + // StatusIdle indicates the server is in idle state. + StatusIdle ServerStatus = iota - // StatusRunning indicates process is up and processing tasks. + // StatusRunning indicates the servier is up and processing tasks. StatusRunning - // StatusStopped indicates process is up but not processing new tasks. + // StatusQuiet indicates the server is up but not processing new tasks. + StatusQuiet + + // StatusStopped indicates the server server has been stopped. StatusStopped ) var statuses = []string{ "idle", "running", + "quiet", "stopped", } -func (s PStatus) String() string { +func (s ServerStatus) String() string { if StatusIdle <= s && s <= StatusStopped { return statuses[s] } @@ -151,9 +155,9 @@ type workerStats struct { started time.Time } -// NewProcessState returns a new instance of ProcessState. -func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState { - return &ProcessState{ +// NewServerState returns a new instance of ServerState. +func NewServerState(host string, pid, concurrency int, queues map[string]int, strict bool) *ServerState { + return &ServerState{ host: host, pid: pid, concurrency: concurrency, @@ -164,59 +168,66 @@ func NewProcessState(host string, pid, concurrency int, queues map[string]int, s } } -// SetStatus updates the state of process. -func (ps *ProcessState) SetStatus(status PStatus) { - ps.mu.Lock() - defer ps.mu.Unlock() - ps.status = status +// SetStatus updates the status of server. +func (ss *ServerState) SetStatus(status ServerStatus) { + ss.mu.Lock() + defer ss.mu.Unlock() + ss.status = status +} + +// GetStatus returns the status of server. +func (ss *ServerState) Status() ServerStatus { + ss.mu.Lock() + defer ss.mu.Unlock() + return ss.status } // SetStarted records when the process started processing. -func (ps *ProcessState) SetStarted(t time.Time) { - ps.mu.Lock() - defer ps.mu.Unlock() - ps.started = t +func (ss *ServerState) SetStarted(t time.Time) { + ss.mu.Lock() + defer ss.mu.Unlock() + ss.started = t } // AddWorkerStats records when a worker started and which task it's processing. -func (ps *ProcessState) AddWorkerStats(msg *TaskMessage, started time.Time) { - ps.mu.Lock() - defer ps.mu.Unlock() - ps.workers[msg.ID.String()] = &workerStats{msg, started} +func (ss *ServerState) AddWorkerStats(msg *TaskMessage, started time.Time) { + ss.mu.Lock() + defer ss.mu.Unlock() + ss.workers[msg.ID.String()] = &workerStats{msg, started} } // DeleteWorkerStats removes a worker's entry from the process state. -func (ps *ProcessState) DeleteWorkerStats(msg *TaskMessage) { - ps.mu.Lock() - defer ps.mu.Unlock() - delete(ps.workers, msg.ID.String()) +func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) { + ss.mu.Lock() + defer ss.mu.Unlock() + delete(ss.workers, msg.ID.String()) } // Get returns current state of process as a ProcessInfo. -func (ps *ProcessState) Get() *ProcessInfo { - ps.mu.Lock() - defer ps.mu.Unlock() +func (ss *ServerState) Get() *ProcessInfo { + ss.mu.Lock() + defer ss.mu.Unlock() return &ProcessInfo{ - Host: ps.host, - PID: ps.pid, - Concurrency: ps.concurrency, - Queues: cloneQueueConfig(ps.queues), - StrictPriority: ps.strictPriority, - Status: ps.status.String(), - Started: ps.started, - ActiveWorkerCount: len(ps.workers), + Host: ss.host, + PID: ss.pid, + Concurrency: ss.concurrency, + Queues: cloneQueueConfig(ss.queues), + StrictPriority: ss.strictPriority, + Status: ss.status.String(), + Started: ss.started, + ActiveWorkerCount: len(ss.workers), } } // GetWorkers returns a list of currently running workers' info. -func (ps *ProcessState) GetWorkers() []*WorkerInfo { - ps.mu.Lock() - defer ps.mu.Unlock() +func (ss *ServerState) GetWorkers() []*WorkerInfo { + ss.mu.Lock() + defer ss.mu.Unlock() var res []*WorkerInfo - for _, w := range ps.workers { + for _, w := range ss.workers { res = append(res, &WorkerInfo{ - Host: ps.host, - PID: ps.pid, + Host: ss.host, + PID: ss.pid, ID: w.msg.ID, Type: w.msg.Type, Queue: w.msg.Queue, diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index de5781d..6e373d2 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -484,16 +484,16 @@ redis.call("EXPIRE", KEYS[3], ARGV[2]) redis.call("ZADD", KEYS[4], ARGV[1], KEYS[3]) return redis.status_reply("OK")`) -// WriteProcessState writes process state data to redis with expiration set to the value ttl. -func (r *RDB) WriteProcessState(ps *base.ProcessState, ttl time.Duration) error { - info := ps.Get() +// WriteServerState writes server state data to redis with expiration set to the value ttl. +func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error { + info := ss.Get() bytes, err := json.Marshal(info) if err != nil { return err } var args []interface{} // args to the lua script exp := time.Now().Add(ttl).UTC() - workers := ps.GetWorkers() + workers := ss.GetWorkers() args = append(args, float64(exp.Unix()), ttl.Seconds(), bytes) for _, w := range workers { bytes, err := json.Marshal(w) @@ -520,9 +520,9 @@ redis.call("ZREM", KEYS[3], KEYS[4]) redis.call("DEL", KEYS[4]) return redis.status_reply("OK")`) -// ClearProcessState deletes process state data from redis. -func (r *RDB) ClearProcessState(ps *base.ProcessState) error { - info := ps.Get() +// ClearServerState deletes server state data from redis. +func (r *RDB) ClearServerState(ss *base.ServerState) error { + info := ss.Get() host, pid := info.Host, info.PID pkey := base.ProcessInfoKey(host, pid) wkey := base.WorkersKey(host, pid) diff --git a/processor.go b/processor.go index 6b3a1ed..33ce503 100644 --- a/processor.go +++ b/processor.go @@ -21,7 +21,7 @@ type processor struct { logger Logger rdb *rdb.RDB - ps *base.ProcessState + ss *base.ServerState handler Handler @@ -62,9 +62,9 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, +func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { - info := ps.Get() + info := ss.Get() qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) if info.StrictPriority { @@ -73,7 +73,7 @@ func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc return &processor{ logger: l, rdb: r, - ps: ps, + ss: ss, queueConfig: qcfg, orderedQueues: orderedQueues, retryDelayFunc: fn, @@ -171,10 +171,10 @@ func (p *processor) exec() { p.requeue(msg) return case p.sema <- struct{}{}: // acquire token - p.ps.AddWorkerStats(msg, time.Now()) + p.ss.AddWorkerStats(msg, time.Now()) go func() { defer func() { - p.ps.DeleteWorkerStats(msg) + p.ss.DeleteWorkerStats(msg) <-p.sema /* release token */ }() diff --git a/server.go b/server.go index 0102eec..18d31d5 100644 --- a/server.go +++ b/server.go @@ -31,17 +31,14 @@ import ( // (e.g., queue size reaches a certain limit, or the task has been in the // queue for a certain amount of time). type Server struct { - mu sync.Mutex - state serverState - - ps *base.ProcessState - - // wait group to wait for all goroutines to finish. - wg sync.WaitGroup + ss *base.ServerState logger Logger - rdb *rdb.RDB + rdb *rdb.RDB + + // wait group to wait for all goroutines to finish. + wg sync.WaitGroup scheduler *scheduler processor *processor syncer *syncer @@ -189,19 +186,18 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { pid := os.Getpid() rdb := rdb.NewRDB(createRedisClient(r)) - ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority) + ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() syncer := newSyncer(logger, syncCh, 5*time.Second) - heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second) + heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second) scheduler := newScheduler(logger, rdb, 5*time.Second, queues) - processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) + processor := newProcessor(logger, rdb, ss, delayFunc, syncCh, cancels, cfg.ErrorHandler) subscriber := newSubscriber(logger, rdb, cancels) return &Server{ - state: stateIdle, + ss: ss, logger: logger, rdb: rdb, - ps: ps, scheduler: scheduler, processor: processor, syncer: syncer, @@ -235,14 +231,6 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { // ErrServerStopped indicates that the operation is now illegal because of the server being stopped. var ErrServerStopped = errors.New("asynq: the server has been stopped") -type serverState int - -const ( - stateIdle serverState = iota - stateRunning - stateStopped -) - // Run starts the background-task processing and blocks until // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all pending workers and other @@ -259,15 +247,13 @@ func (srv *Server) Run(handler Handler) error { // Starts the background-task processing. // TODO: doc func (srv *Server) Start(handler Handler) error { - srv.mu.Lock() - defer srv.mu.Unlock() - switch srv.state { - case stateRunning: + switch srv.ss.Status() { + case base.StatusRunning: return fmt.Errorf("asynq: the server is already running") - case stateStopped: + case base.StatusStopped: return ErrServerStopped } - srv.state = stateRunning + srv.ss.SetStatus(base.StatusRunning) srv.processor.handler = handler type prefixLogger interface { @@ -290,9 +276,7 @@ func (srv *Server) Start(handler Handler) error { // Stops the background-task processing. // TODO: do we need to return error? func (srv *Server) Stop() { - srv.mu.Lock() - defer srv.mu.Unlock() - if srv.state != stateRunning { + if srv.ss.Status() != base.StatusRunning { // server is not running, do nothing and return. return } @@ -301,7 +285,6 @@ func (srv *Server) Stop() { srv.logger.Info("Starting graceful shutdown") // Note: The order of termination is important. // Sender goroutines should be terminated before the receiver goroutines. - // // processor -> syncer (via syncCh) srv.scheduler.terminate() srv.processor.terminate() @@ -312,7 +295,7 @@ func (srv *Server) Stop() { srv.wg.Wait() srv.rdb.Close() - srv.state = stateStopped + srv.ss.SetStatus(base.StatusStopped) srv.logger.Info("Bye!") } @@ -321,5 +304,5 @@ func (srv *Server) Stop() { // TODO: doc func (srv *Server) Quiet() { srv.processor.stop() - srv.ps.SetStatus(base.StatusStopped) // TODO: rephrase this state, like StatusSilent? + srv.ss.SetStatus(base.StatusQuiet) }