2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 10:56:12 +08:00

Refactor server state

This commit is contained in:
Ken Hibino
2020-04-12 11:41:50 -07:00
parent f131c495b6
commit 1036677109
5 changed files with 95 additions and 101 deletions

View File

@@ -18,7 +18,7 @@ type heartbeater struct {
logger Logger
rdb *rdb.RDB
ps *base.ProcessState
ss *base.ServerState
// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}
@@ -27,11 +27,11 @@ type heartbeater struct {
interval time.Duration
}
func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
func newHeartbeater(l Logger, rdb *rdb.RDB, ss *base.ServerState, interval time.Duration) *heartbeater {
return &heartbeater{
logger: l,
rdb: rdb,
ps: ps,
ss: ss,
done: make(chan struct{}),
interval: interval,
}
@@ -44,8 +44,8 @@ func (h *heartbeater) terminate() {
}
func (h *heartbeater) start(wg *sync.WaitGroup) {
h.ps.SetStarted(time.Now())
h.ps.SetStatus(base.StatusRunning)
h.ss.SetStarted(time.Now())
h.ss.SetStatus(base.StatusRunning)
wg.Add(1)
go func() {
defer wg.Done()
@@ -53,7 +53,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
for {
select {
case <-h.done:
h.rdb.ClearProcessState(h.ps)
h.rdb.ClearServerState(h.ss)
h.logger.Info("Heartbeater done")
return
case <-time.After(h.interval):
@@ -66,7 +66,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessState(h.ps, h.interval*2)
err := h.rdb.WriteServerState(h.ss, h.interval*2)
if err != nil {
h.logger.Error("could not write heartbeat data: %v", err)
}

View File

@@ -104,42 +104,46 @@ type TaskMessage struct {
UniqueKey string
}
// ProcessState holds process level information.
// ServerState holds process level information.
//
// ProcessStates are safe for concurrent use by multiple goroutines.
type ProcessState struct {
// ServerStates are safe for concurrent use by multiple goroutines.
type ServerState struct {
mu sync.Mutex // guards all data fields
concurrency int
queues map[string]int
strictPriority bool
pid int
host string
status PStatus
status ServerStatus
started time.Time
workers map[string]*workerStats
}
// PStatus represents status of a process.
type PStatus int
// ServerStatus represents status of a server.
type ServerStatus int
const (
// StatusIdle indicates process is in idle state.
StatusIdle PStatus = iota
// StatusIdle indicates the server is in idle state.
StatusIdle ServerStatus = iota
// StatusRunning indicates process is up and processing tasks.
// StatusRunning indicates the servier is up and processing tasks.
StatusRunning
// StatusStopped indicates process is up but not processing new tasks.
// StatusQuiet indicates the server is up but not processing new tasks.
StatusQuiet
// StatusStopped indicates the server server has been stopped.
StatusStopped
)
var statuses = []string{
"idle",
"running",
"quiet",
"stopped",
}
func (s PStatus) String() string {
func (s ServerStatus) String() string {
if StatusIdle <= s && s <= StatusStopped {
return statuses[s]
}
@@ -151,9 +155,9 @@ type workerStats struct {
started time.Time
}
// NewProcessState returns a new instance of ProcessState.
func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState {
return &ProcessState{
// NewServerState returns a new instance of ServerState.
func NewServerState(host string, pid, concurrency int, queues map[string]int, strict bool) *ServerState {
return &ServerState{
host: host,
pid: pid,
concurrency: concurrency,
@@ -164,59 +168,66 @@ func NewProcessState(host string, pid, concurrency int, queues map[string]int, s
}
}
// SetStatus updates the state of process.
func (ps *ProcessState) SetStatus(status PStatus) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.status = status
// SetStatus updates the status of server.
func (ss *ServerState) SetStatus(status ServerStatus) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.status = status
}
// GetStatus returns the status of server.
func (ss *ServerState) Status() ServerStatus {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.status
}
// SetStarted records when the process started processing.
func (ps *ProcessState) SetStarted(t time.Time) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.started = t
func (ss *ServerState) SetStarted(t time.Time) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.started = t
}
// AddWorkerStats records when a worker started and which task it's processing.
func (ps *ProcessState) AddWorkerStats(msg *TaskMessage, started time.Time) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.workers[msg.ID.String()] = &workerStats{msg, started}
func (ss *ServerState) AddWorkerStats(msg *TaskMessage, started time.Time) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.workers[msg.ID.String()] = &workerStats{msg, started}
}
// DeleteWorkerStats removes a worker's entry from the process state.
func (ps *ProcessState) DeleteWorkerStats(msg *TaskMessage) {
ps.mu.Lock()
defer ps.mu.Unlock()
delete(ps.workers, msg.ID.String())
func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) {
ss.mu.Lock()
defer ss.mu.Unlock()
delete(ss.workers, msg.ID.String())
}
// Get returns current state of process as a ProcessInfo.
func (ps *ProcessState) Get() *ProcessInfo {
ps.mu.Lock()
defer ps.mu.Unlock()
func (ss *ServerState) Get() *ProcessInfo {
ss.mu.Lock()
defer ss.mu.Unlock()
return &ProcessInfo{
Host: ps.host,
PID: ps.pid,
Concurrency: ps.concurrency,
Queues: cloneQueueConfig(ps.queues),
StrictPriority: ps.strictPriority,
Status: ps.status.String(),
Started: ps.started,
ActiveWorkerCount: len(ps.workers),
Host: ss.host,
PID: ss.pid,
Concurrency: ss.concurrency,
Queues: cloneQueueConfig(ss.queues),
StrictPriority: ss.strictPriority,
Status: ss.status.String(),
Started: ss.started,
ActiveWorkerCount: len(ss.workers),
}
}
// GetWorkers returns a list of currently running workers' info.
func (ps *ProcessState) GetWorkers() []*WorkerInfo {
ps.mu.Lock()
defer ps.mu.Unlock()
func (ss *ServerState) GetWorkers() []*WorkerInfo {
ss.mu.Lock()
defer ss.mu.Unlock()
var res []*WorkerInfo
for _, w := range ps.workers {
for _, w := range ss.workers {
res = append(res, &WorkerInfo{
Host: ps.host,
PID: ps.pid,
Host: ss.host,
PID: ss.pid,
ID: w.msg.ID,
Type: w.msg.Type,
Queue: w.msg.Queue,

View File

@@ -484,16 +484,16 @@ redis.call("EXPIRE", KEYS[3], ARGV[2])
redis.call("ZADD", KEYS[4], ARGV[1], KEYS[3])
return redis.status_reply("OK")`)
// WriteProcessState writes process state data to redis with expiration set to the value ttl.
func (r *RDB) WriteProcessState(ps *base.ProcessState, ttl time.Duration) error {
info := ps.Get()
// WriteServerState writes server state data to redis with expiration set to the value ttl.
func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
info := ss.Get()
bytes, err := json.Marshal(info)
if err != nil {
return err
}
var args []interface{} // args to the lua script
exp := time.Now().Add(ttl).UTC()
workers := ps.GetWorkers()
workers := ss.GetWorkers()
args = append(args, float64(exp.Unix()), ttl.Seconds(), bytes)
for _, w := range workers {
bytes, err := json.Marshal(w)
@@ -520,9 +520,9 @@ redis.call("ZREM", KEYS[3], KEYS[4])
redis.call("DEL", KEYS[4])
return redis.status_reply("OK")`)
// ClearProcessState deletes process state data from redis.
func (r *RDB) ClearProcessState(ps *base.ProcessState) error {
info := ps.Get()
// ClearServerState deletes server state data from redis.
func (r *RDB) ClearServerState(ss *base.ServerState) error {
info := ss.Get()
host, pid := info.Host, info.PID
pkey := base.ProcessInfoKey(host, pid)
wkey := base.WorkersKey(host, pid)

View File

@@ -21,7 +21,7 @@ type processor struct {
logger Logger
rdb *rdb.RDB
ps *base.ProcessState
ss *base.ServerState
handler Handler
@@ -62,9 +62,9 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor.
func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc,
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
info := ps.Get()
info := ss.Get()
qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil)
if info.StrictPriority {
@@ -73,7 +73,7 @@ func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc
return &processor{
logger: l,
rdb: r,
ps: ps,
ss: ss,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
@@ -171,10 +171,10 @@ func (p *processor) exec() {
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
p.ps.AddWorkerStats(msg, time.Now())
p.ss.AddWorkerStats(msg, time.Now())
go func() {
defer func() {
p.ps.DeleteWorkerStats(msg)
p.ss.DeleteWorkerStats(msg)
<-p.sema /* release token */
}()

View File

@@ -31,17 +31,14 @@ import (
// (e.g., queue size reaches a certain limit, or the task has been in the
// queue for a certain amount of time).
type Server struct {
mu sync.Mutex
state serverState
ps *base.ProcessState
// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
ss *base.ServerState
logger Logger
rdb *rdb.RDB
// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
scheduler *scheduler
processor *processor
syncer *syncer
@@ -189,19 +186,18 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
pid := os.Getpid()
rdb := rdb.NewRDB(createRedisClient(r))
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority)
syncCh := make(chan *syncRequest)
cancels := base.NewCancelations()
syncer := newSyncer(logger, syncCh, 5*time.Second)
heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second)
heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second)
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
processor := newProcessor(logger, rdb, ss, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(logger, rdb, cancels)
return &Server{
state: stateIdle,
ss: ss,
logger: logger,
rdb: rdb,
ps: ps,
scheduler: scheduler,
processor: processor,
syncer: syncer,
@@ -235,14 +231,6 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
// ErrServerStopped indicates that the operation is now illegal because of the server being stopped.
var ErrServerStopped = errors.New("asynq: the server has been stopped")
type serverState int
const (
stateIdle serverState = iota
stateRunning
stateStopped
)
// Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other
@@ -259,15 +247,13 @@ func (srv *Server) Run(handler Handler) error {
// Starts the background-task processing.
// TODO: doc
func (srv *Server) Start(handler Handler) error {
srv.mu.Lock()
defer srv.mu.Unlock()
switch srv.state {
case stateRunning:
switch srv.ss.Status() {
case base.StatusRunning:
return fmt.Errorf("asynq: the server is already running")
case stateStopped:
case base.StatusStopped:
return ErrServerStopped
}
srv.state = stateRunning
srv.ss.SetStatus(base.StatusRunning)
srv.processor.handler = handler
type prefixLogger interface {
@@ -290,9 +276,7 @@ func (srv *Server) Start(handler Handler) error {
// Stops the background-task processing.
// TODO: do we need to return error?
func (srv *Server) Stop() {
srv.mu.Lock()
defer srv.mu.Unlock()
if srv.state != stateRunning {
if srv.ss.Status() != base.StatusRunning {
// server is not running, do nothing and return.
return
}
@@ -301,7 +285,6 @@ func (srv *Server) Stop() {
srv.logger.Info("Starting graceful shutdown")
// Note: The order of termination is important.
// Sender goroutines should be terminated before the receiver goroutines.
//
// processor -> syncer (via syncCh)
srv.scheduler.terminate()
srv.processor.terminate()
@@ -312,7 +295,7 @@ func (srv *Server) Stop() {
srv.wg.Wait()
srv.rdb.Close()
srv.state = stateStopped
srv.ss.SetStatus(base.StatusStopped)
srv.logger.Info("Bye!")
}
@@ -321,5 +304,5 @@ func (srv *Server) Stop() {
// TODO: doc
func (srv *Server) Quiet() {
srv.processor.stop()
srv.ps.SetStatus(base.StatusStopped) // TODO: rephrase this state, like StatusSilent?
srv.ss.SetStatus(base.StatusQuiet)
}