2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add ProcessState type to base package

This commit is contained in:
Ken Hibino 2020-02-18 06:57:39 -08:00
parent f9a6c6156f
commit 830020eb39
9 changed files with 150 additions and 79 deletions

View File

@ -34,8 +34,7 @@ type Background struct {
mu sync.Mutex mu sync.Mutex
running bool running bool
// channel to send state updates. ps *base.ProcessState
stateCh chan<- string
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup
@ -131,18 +130,17 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
pid := os.Getpid() pid := os.Getpid()
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
syncRequestCh := make(chan *syncRequest) ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
stateCh := make(chan string) syncCh := make(chan *syncRequest)
workerCh := make(chan int) cancels := base.NewCancelations()
cancelations := base.NewCancelations() syncer := newSyncer(syncCh, 5*time.Second)
syncer := newSyncer(syncRequestCh, 5*time.Second) heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
heartbeater := newHeartbeater(rdb, host, pid, n, queues, cfg.StrictPriority, 5*time.Second, stateCh, workerCh)
scheduler := newScheduler(rdb, 5*time.Second, queues) scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, queues, cfg.StrictPriority, n, delayFunc, syncRequestCh, workerCh, cancelations) processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels)
subscriber := newSubscriber(rdb, cancelations) subscriber := newSubscriber(rdb, cancels)
return &Background{ return &Background{
stateCh: stateCh,
rdb: rdb, rdb: rdb,
ps: ps,
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
syncer: syncer, syncer: syncer,
@ -194,7 +192,7 @@ func (bg *Background) Run(handler Handler) {
sig := <-sigs sig := <-sigs
if sig == syscall.SIGTSTP { if sig == syscall.SIGTSTP {
bg.processor.stop() bg.processor.stop()
bg.stateCh <- "stopped" bg.ps.SetStatus(base.StatusStopped)
continue continue
} }
break break
@ -232,8 +230,7 @@ func (bg *Background) stop() {
// Note: The order of termination is important. // Note: The order of termination is important.
// Sender goroutines should be terminated before the receiver goroutines. // Sender goroutines should be terminated before the receiver goroutines.
// //
// processor -> syncer (via syncRequestCh) // processor -> syncer (via syncCh)
// processor -> heartbeater (via workerCh)
bg.scheduler.terminate() bg.scheduler.terminate()
bg.processor.terminate() bg.processor.terminate()
bg.syncer.terminate() bg.syncer.terminate()

View File

@ -17,29 +17,20 @@ import (
type heartbeater struct { type heartbeater struct {
rdb *rdb.RDB rdb *rdb.RDB
pinfo *base.ProcessInfo ps *base.ProcessState
// channel to communicate back to the long running "heartbeater" goroutine. // channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{} done chan struct{}
// channel to receive updates on process state.
stateCh <-chan string
// channel to recieve updates on workers count.
workerCh <-chan int
// interval between heartbeats. // interval between heartbeats.
interval time.Duration interval time.Duration
} }
func newHeartbeater(rdb *rdb.RDB, host string, pid, concurrency int, queues map[string]int, strict bool, func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
interval time.Duration, stateCh <-chan string, workerCh <-chan int) *heartbeater {
return &heartbeater{ return &heartbeater{
rdb: rdb, rdb: rdb,
pinfo: base.NewProcessInfo(host, pid, concurrency, queues, strict), ps: ps,
done: make(chan struct{}), done: make(chan struct{}),
stateCh: stateCh,
workerCh: workerCh,
interval: interval, interval: interval,
} }
} }
@ -51,26 +42,20 @@ func (h *heartbeater) terminate() {
} }
func (h *heartbeater) start(wg *sync.WaitGroup) { func (h *heartbeater) start(wg *sync.WaitGroup) {
h.pinfo.Started = time.Now() h.ps.SetStarted(time.Now())
h.pinfo.State = "running" h.ps.SetStatus(base.StatusRunning)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
h.beat() h.beat()
timer := time.NewTimer(h.interval)
for { for {
select { select {
case <-h.done: case <-h.done:
h.rdb.ClearProcessInfo(h.pinfo) h.rdb.ClearProcessInfo(h.ps.Get())
logger.info("Heartbeater done") logger.info("Heartbeater done")
return return
case state := <-h.stateCh: case <-time.After(h.interval):
h.pinfo.State = state
case delta := <-h.workerCh:
h.pinfo.ActiveWorkerCount += delta
case <-timer.C:
h.beat() h.beat()
timer.Reset(h.interval)
} }
} }
}() }()
@ -79,7 +64,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
func (h *heartbeater) beat() { func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again // Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed. // and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2) err := h.rdb.WriteProcessInfo(h.ps.Get(), h.interval*2)
if err != nil { if err != nil {
logger.error("could not write heartbeat data: %v", err) logger.error("could not write heartbeat data: %v", err)
} }

View File

@ -35,9 +35,8 @@ func TestHeartbeater(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) h.FlushDB(t, r)
stateCh := make(chan string) state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
workerCh := make(chan int) hb := newHeartbeater(rdbClient, state, tc.interval)
hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh)
var wg sync.WaitGroup var wg sync.WaitGroup
hb.start(&wg) hb.start(&wg)
@ -48,7 +47,7 @@ func TestHeartbeater(t *testing.T) {
Queues: tc.queues, Queues: tc.queues,
Concurrency: tc.concurrency, Concurrency: tc.concurrency,
Started: time.Now(), Started: time.Now(),
State: "running", Status: "running",
} }
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
@ -73,13 +72,13 @@ func TestHeartbeater(t *testing.T) {
continue continue
} }
// state change // status change
stateCh <- "stopped" state.SetStatus(base.StatusStopped)
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)
want.State = "stopped" want.Status = "stopped"
ps, err = rdbClient.ListProcesses() ps, err = rdbClient.ListProcesses()
if err != nil { if err != nil {
t.Errorf("could not read process status from redis: %v", err) t.Errorf("could not read process status from redis: %v", err)

View File

@ -87,6 +87,105 @@ type TaskMessage struct {
Timeout string Timeout string
} }
// ProcessState holds process level information.
//
// ProcessStates are safe for concurrent use by multiple goroutines.
type ProcessState struct {
mu sync.Mutex // guards all data fields
concurrency int
queues map[string]int
strictPriority bool
pid int
host string
status PStatus
started time.Time
activeWorkerCount int
}
// PStatus represents status of a process.
type PStatus int
const (
// StatusIdle indicates process is in idle state.
StatusIdle PStatus = iota
// StatusRunning indicates process is up and processing tasks.
StatusRunning
// StatusStopped indicates process is up but not processing new tasks.
StatusStopped
)
var statuses = []string{
"idle",
"running",
"stopped",
}
func (s PStatus) String() string {
if StatusIdle <= s && s <= StatusStopped {
return statuses[s]
}
return "unknown status"
}
// NewProcessState returns a new instance of ProcessState.
func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState {
return &ProcessState{
host: host,
pid: pid,
concurrency: concurrency,
queues: cloneQueueConfig(queues),
strictPriority: strict,
status: StatusIdle,
}
}
// SetStatus updates the state of process.
func (ps *ProcessState) SetStatus(status PStatus) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.status = status
}
// SetStarted records when the process started processing.
func (ps *ProcessState) SetStarted(t time.Time) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.started = t
}
// IncrWorkerCount increments the worker count by delta.
func (ps *ProcessState) IncrWorkerCount(delta int) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.activeWorkerCount += delta
}
// Get returns current state of process as a ProcessInfo.
func (ps *ProcessState) Get() *ProcessInfo {
ps.mu.Lock()
defer ps.mu.Unlock()
return &ProcessInfo{
Host: ps.host,
PID: ps.pid,
Concurrency: ps.concurrency,
Queues: cloneQueueConfig(ps.queues),
StrictPriority: ps.strictPriority,
Status: ps.status.String(),
Started: ps.started,
ActiveWorkerCount: ps.activeWorkerCount,
}
}
func cloneQueueConfig(qcfg map[string]int) map[string]int {
res := make(map[string]int)
for qname, n := range qcfg {
res[qname] = n
}
return res
}
// ProcessInfo holds information about running background worker process. // ProcessInfo holds information about running background worker process.
type ProcessInfo struct { type ProcessInfo struct {
Concurrency int Concurrency int
@ -94,25 +193,14 @@ type ProcessInfo struct {
StrictPriority bool StrictPriority bool
PID int PID int
Host string Host string
State string Status string
Started time.Time Started time.Time
ActiveWorkerCount int ActiveWorkerCount int
} }
// NewProcessInfo returns a new instance of ProcessInfo.
func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo {
return &ProcessInfo{
Host: host,
PID: pid,
Concurrency: concurrency,
Queues: queues,
StrictPriority: strict,
}
}
// Cancelations is a collection that holds cancel functions for all in-progress tasks. // Cancelations is a collection that holds cancel functions for all in-progress tasks.
// //
// Its methods are safe to be used in multiple goroutines. // Cancelations are safe for concurrent use by multipel goroutines.
type Cancelations struct { type Cancelations struct {
mu sync.Mutex mu sync.Mutex
cancelFuncs map[string]context.CancelFunc cancelFuncs map[string]context.CancelFunc

View File

@ -2059,7 +2059,7 @@ func TestListProcesses(t *testing.T) {
Queues: map[string]int{"default": 1}, Queues: map[string]int{"default": 1},
Host: "do.droplet1", Host: "do.droplet1",
PID: 1234, PID: 1234,
State: "running", Status: "running",
Started: time.Now().Add(-time.Hour), Started: time.Now().Add(-time.Hour),
ActiveWorkerCount: 5, ActiveWorkerCount: 5,
} }
@ -2069,7 +2069,7 @@ func TestListProcesses(t *testing.T) {
Queues: map[string]int{"email": 1}, Queues: map[string]int{"email": 1},
Host: "do.droplet2", Host: "do.droplet2",
PID: 9876, PID: 9876,
State: "stopped", Status: "stopped",
Started: time.Now().Add(-2 * time.Hour), Started: time.Now().Add(-2 * time.Hour),
ActiveWorkerCount: 20, ActiveWorkerCount: 20,
} }

View File

@ -748,7 +748,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
PID: 98765, PID: 98765,
Host: "localhost", Host: "localhost",
State: "running", Status: "running",
Started: time.Now(), Started: time.Now(),
ActiveWorkerCount: 1, ActiveWorkerCount: 1,
} }

View File

@ -20,6 +20,8 @@ import (
type processor struct { type processor struct {
rdb *rdb.RDB rdb *rdb.RDB
ps *base.ProcessState
handler Handler handler Handler
queueConfig map[string]int queueConfig map[string]int
@ -32,9 +34,6 @@ type processor struct {
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
// channel to send worker count updates.
workerCh chan<- int
// rate limiter to prevent spamming logs with a bunch of errors. // rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter errLogLimiter *rate.Limiter
@ -60,23 +59,23 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, queues map[string]int, strict bool, concurrency int, fn retryDelayFunc, func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor {
syncRequestCh chan<- *syncRequest, workerCh chan<- int, cancelations *base.Cancelations) *processor { info := ps.Get()
qcfg := normalizeQueueCfg(queues) qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil) orderedQueues := []string(nil)
if strict { if info.StrictPriority {
orderedQueues = sortByPriority(qcfg) orderedQueues = sortByPriority(qcfg)
} }
return &processor{ return &processor{
rdb: r, rdb: r,
ps: ps,
queueConfig: qcfg, queueConfig: qcfg,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: fn, retryDelayFunc: fn,
syncRequestCh: syncRequestCh, syncRequestCh: syncCh,
workerCh: workerCh, cancelations: c,
cancelations: cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, concurrency), sema: make(chan struct{}, info.Concurrency),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -166,10 +165,10 @@ func (p *processor) exec() {
p.requeue(msg) p.requeue(msg)
return return
case p.sema <- struct{}{}: // acquire token case p.sema <- struct{}{}: // acquire token
p.workerCh <- 1 p.ps.IncrWorkerCount(1)
go func() { go func() {
defer func() { defer func() {
p.workerCh <- -1 p.ps.IncrWorkerCount(-1)
<-p.sema /* release token */ <-p.sema /* release token */
}() }()

View File

@ -68,8 +68,9 @@ func TestProcessorSuccess(t *testing.T) {
} }
workerCh := make(chan int) workerCh := make(chan int)
go fakeHeartbeater(workerCh) go fakeHeartbeater(workerCh)
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, defaultQueueConfig, false, 10, defaultDelayFunc, nil, workerCh, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -156,8 +157,9 @@ func TestProcessorRetry(t *testing.T) {
} }
workerCh := make(chan int) workerCh := make(chan int)
go fakeHeartbeater(workerCh) go fakeHeartbeater(workerCh)
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, defaultQueueConfig, false, 10, delayFunc, nil, workerCh, cancelations) p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -219,7 +221,8 @@ func TestProcessorQueues(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(nil, tc.queueCfg, false, 10, defaultDelayFunc, nil, nil, cancelations) ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations)
got := p.queues() got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -288,8 +291,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
workerCh := make(chan int) workerCh := make(chan int)
go fakeHeartbeater(workerCh) go fakeHeartbeater(workerCh)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, queueCfg, true /* strict */, 1, /* concurrency */ ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
defaultDelayFunc, nil, workerCh, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -73,7 +73,7 @@ func ps(cmd *cobra.Command, args []string) {
printRows := func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, ps := range processes { for _, ps := range processes {
fmt.Fprintf(w, tmpl, fmt.Fprintf(w, tmpl,
ps.Host, ps.PID, ps.State, ps.Host, ps.PID, ps.Status,
fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency),
formatQueues(ps.Queues), timeAgo(ps.Started)) formatQueues(ps.Queues), timeAgo(ps.Started))
} }