mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 06:42:16 +08:00
Refactor server state
This commit is contained in:
parent
779065c269
commit
b14c73809e
14
heartbeat.go
14
heartbeat.go
@ -18,7 +18,7 @@ type heartbeater struct {
|
||||
logger Logger
|
||||
rdb *rdb.RDB
|
||||
|
||||
ps *base.ProcessState
|
||||
ss *base.ServerState
|
||||
|
||||
// channel to communicate back to the long running "heartbeater" goroutine.
|
||||
done chan struct{}
|
||||
@ -27,11 +27,11 @@ type heartbeater struct {
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
|
||||
func newHeartbeater(l Logger, rdb *rdb.RDB, ss *base.ServerState, interval time.Duration) *heartbeater {
|
||||
return &heartbeater{
|
||||
logger: l,
|
||||
rdb: rdb,
|
||||
ps: ps,
|
||||
ss: ss,
|
||||
done: make(chan struct{}),
|
||||
interval: interval,
|
||||
}
|
||||
@ -44,8 +44,8 @@ func (h *heartbeater) terminate() {
|
||||
}
|
||||
|
||||
func (h *heartbeater) start(wg *sync.WaitGroup) {
|
||||
h.ps.SetStarted(time.Now())
|
||||
h.ps.SetStatus(base.StatusRunning)
|
||||
h.ss.SetStarted(time.Now())
|
||||
h.ss.SetStatus(base.StatusRunning)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@ -53,7 +53,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
|
||||
for {
|
||||
select {
|
||||
case <-h.done:
|
||||
h.rdb.ClearProcessState(h.ps)
|
||||
h.rdb.ClearServerState(h.ss)
|
||||
h.logger.Info("Heartbeater done")
|
||||
return
|
||||
case <-time.After(h.interval):
|
||||
@ -66,7 +66,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
|
||||
func (h *heartbeater) beat() {
|
||||
// Note: Set TTL to be long enough so that it won't expire before we write again
|
||||
// and short enough to expire quickly once the process is shut down or killed.
|
||||
err := h.rdb.WriteProcessState(h.ps, h.interval*2)
|
||||
err := h.rdb.WriteServerState(h.ss, h.interval*2)
|
||||
if err != nil {
|
||||
h.logger.Error("could not write heartbeat data: %v", err)
|
||||
}
|
||||
|
@ -104,42 +104,46 @@ type TaskMessage struct {
|
||||
UniqueKey string
|
||||
}
|
||||
|
||||
// ProcessState holds process level information.
|
||||
// ServerState holds process level information.
|
||||
//
|
||||
// ProcessStates are safe for concurrent use by multiple goroutines.
|
||||
type ProcessState struct {
|
||||
// ServerStates are safe for concurrent use by multiple goroutines.
|
||||
type ServerState struct {
|
||||
mu sync.Mutex // guards all data fields
|
||||
concurrency int
|
||||
queues map[string]int
|
||||
strictPriority bool
|
||||
pid int
|
||||
host string
|
||||
status PStatus
|
||||
status ServerStatus
|
||||
started time.Time
|
||||
workers map[string]*workerStats
|
||||
}
|
||||
|
||||
// PStatus represents status of a process.
|
||||
type PStatus int
|
||||
// ServerStatus represents status of a server.
|
||||
type ServerStatus int
|
||||
|
||||
const (
|
||||
// StatusIdle indicates process is in idle state.
|
||||
StatusIdle PStatus = iota
|
||||
// StatusIdle indicates the server is in idle state.
|
||||
StatusIdle ServerStatus = iota
|
||||
|
||||
// StatusRunning indicates process is up and processing tasks.
|
||||
// StatusRunning indicates the servier is up and processing tasks.
|
||||
StatusRunning
|
||||
|
||||
// StatusStopped indicates process is up but not processing new tasks.
|
||||
// StatusQuiet indicates the server is up but not processing new tasks.
|
||||
StatusQuiet
|
||||
|
||||
// StatusStopped indicates the server server has been stopped.
|
||||
StatusStopped
|
||||
)
|
||||
|
||||
var statuses = []string{
|
||||
"idle",
|
||||
"running",
|
||||
"quiet",
|
||||
"stopped",
|
||||
}
|
||||
|
||||
func (s PStatus) String() string {
|
||||
func (s ServerStatus) String() string {
|
||||
if StatusIdle <= s && s <= StatusStopped {
|
||||
return statuses[s]
|
||||
}
|
||||
@ -151,9 +155,9 @@ type workerStats struct {
|
||||
started time.Time
|
||||
}
|
||||
|
||||
// NewProcessState returns a new instance of ProcessState.
|
||||
func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState {
|
||||
return &ProcessState{
|
||||
// NewServerState returns a new instance of ServerState.
|
||||
func NewServerState(host string, pid, concurrency int, queues map[string]int, strict bool) *ServerState {
|
||||
return &ServerState{
|
||||
host: host,
|
||||
pid: pid,
|
||||
concurrency: concurrency,
|
||||
@ -164,59 +168,66 @@ func NewProcessState(host string, pid, concurrency int, queues map[string]int, s
|
||||
}
|
||||
}
|
||||
|
||||
// SetStatus updates the state of process.
|
||||
func (ps *ProcessState) SetStatus(status PStatus) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
ps.status = status
|
||||
// SetStatus updates the status of server.
|
||||
func (ss *ServerState) SetStatus(status ServerStatus) {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
ss.status = status
|
||||
}
|
||||
|
||||
// GetStatus returns the status of server.
|
||||
func (ss *ServerState) Status() ServerStatus {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
return ss.status
|
||||
}
|
||||
|
||||
// SetStarted records when the process started processing.
|
||||
func (ps *ProcessState) SetStarted(t time.Time) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
ps.started = t
|
||||
func (ss *ServerState) SetStarted(t time.Time) {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
ss.started = t
|
||||
}
|
||||
|
||||
// AddWorkerStats records when a worker started and which task it's processing.
|
||||
func (ps *ProcessState) AddWorkerStats(msg *TaskMessage, started time.Time) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
ps.workers[msg.ID.String()] = &workerStats{msg, started}
|
||||
func (ss *ServerState) AddWorkerStats(msg *TaskMessage, started time.Time) {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
ss.workers[msg.ID.String()] = &workerStats{msg, started}
|
||||
}
|
||||
|
||||
// DeleteWorkerStats removes a worker's entry from the process state.
|
||||
func (ps *ProcessState) DeleteWorkerStats(msg *TaskMessage) {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
delete(ps.workers, msg.ID.String())
|
||||
func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
delete(ss.workers, msg.ID.String())
|
||||
}
|
||||
|
||||
// Get returns current state of process as a ProcessInfo.
|
||||
func (ps *ProcessState) Get() *ProcessInfo {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
func (ss *ServerState) Get() *ProcessInfo {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
return &ProcessInfo{
|
||||
Host: ps.host,
|
||||
PID: ps.pid,
|
||||
Concurrency: ps.concurrency,
|
||||
Queues: cloneQueueConfig(ps.queues),
|
||||
StrictPriority: ps.strictPriority,
|
||||
Status: ps.status.String(),
|
||||
Started: ps.started,
|
||||
ActiveWorkerCount: len(ps.workers),
|
||||
Host: ss.host,
|
||||
PID: ss.pid,
|
||||
Concurrency: ss.concurrency,
|
||||
Queues: cloneQueueConfig(ss.queues),
|
||||
StrictPriority: ss.strictPriority,
|
||||
Status: ss.status.String(),
|
||||
Started: ss.started,
|
||||
ActiveWorkerCount: len(ss.workers),
|
||||
}
|
||||
}
|
||||
|
||||
// GetWorkers returns a list of currently running workers' info.
|
||||
func (ps *ProcessState) GetWorkers() []*WorkerInfo {
|
||||
ps.mu.Lock()
|
||||
defer ps.mu.Unlock()
|
||||
func (ss *ServerState) GetWorkers() []*WorkerInfo {
|
||||
ss.mu.Lock()
|
||||
defer ss.mu.Unlock()
|
||||
var res []*WorkerInfo
|
||||
for _, w := range ps.workers {
|
||||
for _, w := range ss.workers {
|
||||
res = append(res, &WorkerInfo{
|
||||
Host: ps.host,
|
||||
PID: ps.pid,
|
||||
Host: ss.host,
|
||||
PID: ss.pid,
|
||||
ID: w.msg.ID,
|
||||
Type: w.msg.Type,
|
||||
Queue: w.msg.Queue,
|
||||
|
@ -484,16 +484,16 @@ redis.call("EXPIRE", KEYS[3], ARGV[2])
|
||||
redis.call("ZADD", KEYS[4], ARGV[1], KEYS[3])
|
||||
return redis.status_reply("OK")`)
|
||||
|
||||
// WriteProcessState writes process state data to redis with expiration set to the value ttl.
|
||||
func (r *RDB) WriteProcessState(ps *base.ProcessState, ttl time.Duration) error {
|
||||
info := ps.Get()
|
||||
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
||||
func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
|
||||
info := ss.Get()
|
||||
bytes, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var args []interface{} // args to the lua script
|
||||
exp := time.Now().Add(ttl).UTC()
|
||||
workers := ps.GetWorkers()
|
||||
workers := ss.GetWorkers()
|
||||
args = append(args, float64(exp.Unix()), ttl.Seconds(), bytes)
|
||||
for _, w := range workers {
|
||||
bytes, err := json.Marshal(w)
|
||||
@ -520,9 +520,9 @@ redis.call("ZREM", KEYS[3], KEYS[4])
|
||||
redis.call("DEL", KEYS[4])
|
||||
return redis.status_reply("OK")`)
|
||||
|
||||
// ClearProcessState deletes process state data from redis.
|
||||
func (r *RDB) ClearProcessState(ps *base.ProcessState) error {
|
||||
info := ps.Get()
|
||||
// ClearServerState deletes server state data from redis.
|
||||
func (r *RDB) ClearServerState(ss *base.ServerState) error {
|
||||
info := ss.Get()
|
||||
host, pid := info.Host, info.PID
|
||||
pkey := base.ProcessInfoKey(host, pid)
|
||||
wkey := base.WorkersKey(host, pid)
|
||||
|
12
processor.go
12
processor.go
@ -21,7 +21,7 @@ type processor struct {
|
||||
logger Logger
|
||||
rdb *rdb.RDB
|
||||
|
||||
ps *base.ProcessState
|
||||
ss *base.ServerState
|
||||
|
||||
handler Handler
|
||||
|
||||
@ -62,9 +62,9 @@ type processor struct {
|
||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||
|
||||
// newProcessor constructs a new processor.
|
||||
func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
||||
func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc,
|
||||
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
||||
info := ps.Get()
|
||||
info := ss.Get()
|
||||
qcfg := normalizeQueueCfg(info.Queues)
|
||||
orderedQueues := []string(nil)
|
||||
if info.StrictPriority {
|
||||
@ -73,7 +73,7 @@ func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc
|
||||
return &processor{
|
||||
logger: l,
|
||||
rdb: r,
|
||||
ps: ps,
|
||||
ss: ss,
|
||||
queueConfig: qcfg,
|
||||
orderedQueues: orderedQueues,
|
||||
retryDelayFunc: fn,
|
||||
@ -171,10 +171,10 @@ func (p *processor) exec() {
|
||||
p.requeue(msg)
|
||||
return
|
||||
case p.sema <- struct{}{}: // acquire token
|
||||
p.ps.AddWorkerStats(msg, time.Now())
|
||||
p.ss.AddWorkerStats(msg, time.Now())
|
||||
go func() {
|
||||
defer func() {
|
||||
p.ps.DeleteWorkerStats(msg)
|
||||
p.ss.DeleteWorkerStats(msg)
|
||||
<-p.sema /* release token */
|
||||
}()
|
||||
|
||||
|
49
server.go
49
server.go
@ -31,17 +31,14 @@ import (
|
||||
// (e.g., queue size reaches a certain limit, or the task has been in the
|
||||
// queue for a certain amount of time).
|
||||
type Server struct {
|
||||
mu sync.Mutex
|
||||
state serverState
|
||||
|
||||
ps *base.ProcessState
|
||||
|
||||
// wait group to wait for all goroutines to finish.
|
||||
wg sync.WaitGroup
|
||||
ss *base.ServerState
|
||||
|
||||
logger Logger
|
||||
|
||||
rdb *rdb.RDB
|
||||
rdb *rdb.RDB
|
||||
|
||||
// wait group to wait for all goroutines to finish.
|
||||
wg sync.WaitGroup
|
||||
scheduler *scheduler
|
||||
processor *processor
|
||||
syncer *syncer
|
||||
@ -189,19 +186,18 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
pid := os.Getpid()
|
||||
|
||||
rdb := rdb.NewRDB(createRedisClient(r))
|
||||
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
|
||||
ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority)
|
||||
syncCh := make(chan *syncRequest)
|
||||
cancels := base.NewCancelations()
|
||||
syncer := newSyncer(logger, syncCh, 5*time.Second)
|
||||
heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second)
|
||||
heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second)
|
||||
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
|
||||
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
||||
processor := newProcessor(logger, rdb, ss, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
||||
subscriber := newSubscriber(logger, rdb, cancels)
|
||||
return &Server{
|
||||
state: stateIdle,
|
||||
ss: ss,
|
||||
logger: logger,
|
||||
rdb: rdb,
|
||||
ps: ps,
|
||||
scheduler: scheduler,
|
||||
processor: processor,
|
||||
syncer: syncer,
|
||||
@ -235,14 +231,6 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
|
||||
// ErrServerStopped indicates that the operation is now illegal because of the server being stopped.
|
||||
var ErrServerStopped = errors.New("asynq: the server has been stopped")
|
||||
|
||||
type serverState int
|
||||
|
||||
const (
|
||||
stateIdle serverState = iota
|
||||
stateRunning
|
||||
stateStopped
|
||||
)
|
||||
|
||||
// Run starts the background-task processing and blocks until
|
||||
// an os signal to exit the program is received. Once it receives
|
||||
// a signal, it gracefully shuts down all pending workers and other
|
||||
@ -259,15 +247,13 @@ func (srv *Server) Run(handler Handler) error {
|
||||
// Starts the background-task processing.
|
||||
// TODO: doc
|
||||
func (srv *Server) Start(handler Handler) error {
|
||||
srv.mu.Lock()
|
||||
defer srv.mu.Unlock()
|
||||
switch srv.state {
|
||||
case stateRunning:
|
||||
switch srv.ss.Status() {
|
||||
case base.StatusRunning:
|
||||
return fmt.Errorf("asynq: the server is already running")
|
||||
case stateStopped:
|
||||
case base.StatusStopped:
|
||||
return ErrServerStopped
|
||||
}
|
||||
srv.state = stateRunning
|
||||
srv.ss.SetStatus(base.StatusRunning)
|
||||
srv.processor.handler = handler
|
||||
|
||||
type prefixLogger interface {
|
||||
@ -290,9 +276,7 @@ func (srv *Server) Start(handler Handler) error {
|
||||
// Stops the background-task processing.
|
||||
// TODO: do we need to return error?
|
||||
func (srv *Server) Stop() {
|
||||
srv.mu.Lock()
|
||||
defer srv.mu.Unlock()
|
||||
if srv.state != stateRunning {
|
||||
if srv.ss.Status() != base.StatusRunning {
|
||||
// server is not running, do nothing and return.
|
||||
return
|
||||
}
|
||||
@ -301,7 +285,6 @@ func (srv *Server) Stop() {
|
||||
srv.logger.Info("Starting graceful shutdown")
|
||||
// Note: The order of termination is important.
|
||||
// Sender goroutines should be terminated before the receiver goroutines.
|
||||
//
|
||||
// processor -> syncer (via syncCh)
|
||||
srv.scheduler.terminate()
|
||||
srv.processor.terminate()
|
||||
@ -312,7 +295,7 @@ func (srv *Server) Stop() {
|
||||
srv.wg.Wait()
|
||||
|
||||
srv.rdb.Close()
|
||||
srv.state = stateStopped
|
||||
srv.ss.SetStatus(base.StatusStopped)
|
||||
|
||||
srv.logger.Info("Bye!")
|
||||
}
|
||||
@ -321,5 +304,5 @@ func (srv *Server) Stop() {
|
||||
// TODO: doc
|
||||
func (srv *Server) Quiet() {
|
||||
srv.processor.stop()
|
||||
srv.ps.SetStatus(base.StatusStopped) // TODO: rephrase this state, like StatusSilent?
|
||||
srv.ss.SetStatus(base.StatusQuiet)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user