2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Make heartbeater goroutine a monitor goroutine for process info.

This commit is contained in:
Ken Hibino 2020-02-15 21:59:06 -08:00
parent 0856ef32e0
commit 2bcaea52ce
7 changed files with 65 additions and 82 deletions

View File

@ -34,7 +34,9 @@ type Background struct {
mu sync.Mutex mu sync.Mutex
running bool running bool
pinfo *base.ProcessInfo // channel to send state updates.
stateCh chan<- string
rdb *rdb.RDB rdb *rdb.RDB
scheduler *scheduler scheduler *scheduler
processor *processor processor *processor
@ -125,17 +127,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
} }
pid := os.Getpid() pid := os.Getpid()
pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority)
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
syncRequestCh := make(chan *syncRequest) syncRequestCh := make(chan *syncRequest)
stateCh := make(chan string)
workerCh := make(chan int)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
syncer := newSyncer(syncRequestCh, 5*time.Second) syncer := newSyncer(syncRequestCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second) heartbeater := newHeartbeater(rdb, host, pid, n, queues, cfg.StrictPriority, 5*time.Second, stateCh, workerCh)
scheduler := newScheduler(rdb, 5*time.Second, queues) scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh, cancelations) processor := newProcessor(rdb, queues, cfg.StrictPriority, n, delayFunc, syncRequestCh, workerCh, cancelations)
subscriber := newSubscriber(rdb, cancelations) subscriber := newSubscriber(rdb, cancelations)
return &Background{ return &Background{
pinfo: pinfo, stateCh: stateCh,
rdb: rdb, rdb: rdb,
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
@ -188,7 +191,7 @@ func (bg *Background) Run(handler Handler) {
sig := <-sigs sig := <-sigs
if sig == syscall.SIGTSTP { if sig == syscall.SIGTSTP {
bg.processor.stop() bg.processor.stop()
bg.pinfo.SetState("stopped") bg.stateCh <- "stopped"
continue continue
} }
break break
@ -231,7 +234,6 @@ func (bg *Background) stop() {
bg.subscriber.terminate() bg.subscriber.terminate()
bg.heartbeater.terminate() bg.heartbeater.terminate()
bg.rdb.ClearProcessInfo(bg.pinfo)
bg.rdb.Close() bg.rdb.Close()
bg.processor.handler = nil bg.processor.handler = nil
bg.running = false bg.running = false

View File

@ -21,15 +21,24 @@ type heartbeater struct {
// channel to communicate back to the long running "heartbeater" goroutine. // channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{} done chan struct{}
// channel to receive updates on process state.
stateCh <-chan string
// channel to recieve updates on workers count.
workerCh <-chan int
// interval between heartbeats. // interval between heartbeats.
interval time.Duration interval time.Duration
} }
func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater { func newHeartbeater(rdb *rdb.RDB, host string, pid, concurrency int, queues map[string]int, strict bool,
interval time.Duration, stateCh <-chan string, workerCh <-chan int) *heartbeater {
return &heartbeater{ return &heartbeater{
rdb: rdb, rdb: rdb,
pinfo: pinfo, pinfo: base.NewProcessInfo(host, pid, concurrency, queues, strict),
done: make(chan struct{}), done: make(chan struct{}),
stateCh: stateCh,
workerCh: workerCh,
interval: interval, interval: interval,
} }
} }
@ -41,17 +50,24 @@ func (h *heartbeater) terminate() {
} }
func (h *heartbeater) start() { func (h *heartbeater) start() {
h.pinfo.SetStarted(time.Now()) h.pinfo.Started = time.Now()
h.pinfo.SetState("running") h.pinfo.State = "running"
go func() { go func() {
h.beat() h.beat()
timer := time.NewTimer(h.interval)
for { for {
select { select {
case <-h.done: case <-h.done:
h.rdb.ClearProcessInfo(h.pinfo)
logger.info("Heartbeater done") logger.info("Heartbeater done")
return return
case <-time.After(h.interval): case state := <-h.stateCh:
h.pinfo.State = state
case delta := <-h.workerCh:
h.pinfo.ActiveWorkerCount += delta
case <-timer.C:
h.beat() h.beat()
timer.Reset(h.interval)
} }
} }
}() }()

View File

@ -34,8 +34,9 @@ func TestHeartbeater(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) h.FlushDB(t, r)
pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false) stateCh := make(chan string)
hb := newHeartbeater(rdbClient, pi, tc.interval) workerCh := make(chan int)
hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh)
want := &base.ProcessInfo{ want := &base.ProcessInfo{
Host: tc.host, Host: tc.host,
@ -64,7 +65,7 @@ func TestHeartbeater(t *testing.T) {
} }
// state change // state change
pi.SetState("stopped") stateCh <- "stopped"
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)

View File

@ -89,7 +89,6 @@ type TaskMessage struct {
// ProcessInfo holds information about running background worker process. // ProcessInfo holds information about running background worker process.
type ProcessInfo struct { type ProcessInfo struct {
mu sync.Mutex
Concurrency int Concurrency int
Queues map[string]int Queues map[string]int
StrictPriority bool StrictPriority bool
@ -111,27 +110,6 @@ func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, st
} }
} }
// SetState set the state field of the process info.
func (p *ProcessInfo) SetState(state string) {
p.mu.Lock()
defer p.mu.Unlock()
p.State = state
}
// SetStarted set the started field of the process info.
func (p *ProcessInfo) SetStarted(t time.Time) {
p.mu.Lock()
defer p.mu.Unlock()
p.Started = t
}
// IncrActiveWorkerCount increments active worker count by delta.
func (p *ProcessInfo) IncrActiveWorkerCount(delta int) {
p.mu.Lock()
defer p.mu.Unlock()
p.ActiveWorkerCount += delta
}
// Cancelations is a collection that holds cancel functions for all in-progress tasks. // Cancelations is a collection that holds cancel functions for all in-progress tasks.
// //
// Its methods are safe to be used in multiple goroutines. // Its methods are safe to be used in multiple goroutines.

View File

@ -5,7 +5,6 @@
package base package base
import ( import (
"sync"
"testing" "testing"
"time" "time"
) )
@ -79,30 +78,3 @@ func TestProcessInfoKey(t *testing.T) {
} }
} }
} }
// Note: Run this test with -race flag to check for data race.
func TestProcessInfoSetter(t *testing.T) {
pi := NewProcessInfo("localhost", 1234, 8, map[string]int{"default": 1}, false)
var wg sync.WaitGroup
wg.Add(3)
go func() {
pi.SetState("runnning")
wg.Done()
}()
go func() {
pi.SetStarted(time.Now())
pi.IncrActiveWorkerCount(1)
wg.Done()
}()
go func() {
pi.SetState("stopped")
wg.Done()
}()
wg.Wait()
}

View File

@ -20,8 +20,6 @@ import (
type processor struct { type processor struct {
rdb *rdb.RDB rdb *rdb.RDB
pinfo *base.ProcessInfo
handler Handler handler Handler
queueConfig map[string]int queueConfig map[string]int
@ -34,6 +32,9 @@ type processor struct {
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
// channel to send worker count updates.
workerCh chan<- int
// rate limiter to prevent spamming logs with a bunch of errors. // rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter errLogLimiter *rate.Limiter
@ -59,22 +60,23 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest, cancelations *base.Cancelations) *processor { func newProcessor(r *rdb.RDB, queues map[string]int, strict bool, concurrency int, fn retryDelayFunc,
qcfg := normalizeQueueCfg(pinfo.Queues) syncRequestCh chan<- *syncRequest, workerCh chan<- int, cancelations *base.Cancelations) *processor {
qcfg := normalizeQueueCfg(queues)
orderedQueues := []string(nil) orderedQueues := []string(nil)
if pinfo.StrictPriority { if strict {
orderedQueues = sortByPriority(qcfg) orderedQueues = sortByPriority(qcfg)
} }
return &processor{ return &processor{
rdb: r, rdb: r,
pinfo: pinfo,
queueConfig: qcfg, queueConfig: qcfg,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: fn, retryDelayFunc: fn,
syncRequestCh: syncRequestCh, syncRequestCh: syncRequestCh,
workerCh: workerCh,
cancelations: cancelations, cancelations: cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, pinfo.Concurrency), sema: make(chan struct{}, concurrency),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
@ -162,11 +164,11 @@ func (p *processor) exec() {
p.requeue(msg) p.requeue(msg)
return return
case p.sema <- struct{}{}: // acquire token case p.sema <- struct{}{}: // acquire token
p.pinfo.IncrActiveWorkerCount(1) p.workerCh <- 1
go func() { go func() {
defer func() { defer func() {
p.workerCh <- -1
<-p.sema /* release token */ <-p.sema /* release token */
p.pinfo.IncrActiveWorkerCount(-1)
}() }()
resCh := make(chan error, 1) resCh := make(chan error, 1)

View File

@ -66,9 +66,10 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task) processed = append(processed, task)
return nil return nil
} }
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) workerCh := make(chan int)
go fakeHeartbeater(workerCh)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, defaultQueueConfig, false, 10, defaultDelayFunc, nil, workerCh, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()
@ -81,6 +82,7 @@ func TestProcessorSuccess(t *testing.T) {
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
@ -151,9 +153,10 @@ func TestProcessorRetry(t *testing.T) {
handler := func(ctx context.Context, task *Task) error { handler := func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) workerCh := make(chan int)
go fakeHeartbeater(workerCh)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, pi, delayFunc, nil, cancelations) p := newProcessor(rdbClient, defaultQueueConfig, false, 10, delayFunc, nil, workerCh, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()
@ -166,6 +169,7 @@ func TestProcessorRetry(t *testing.T) {
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score
gotRetry := h.GetRetryEntries(t, r) gotRetry := h.GetRetryEntries(t, r)
@ -212,9 +216,8 @@ func TestProcessorQueues(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(nil, pi, defaultDelayFunc, nil, cancelations) p := newProcessor(nil, tc.queueCfg, false, 10, defaultDelayFunc, nil, nil, cancelations)
got := p.queues() got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -280,14 +283,17 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1, "low": 1,
} }
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */) workerCh := make(chan int)
go fakeHeartbeater(workerCh)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, pi, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, queueCfg, true /* strict */, 1, /* concurrency */
defaultDelayFunc, nil, workerCh, cancelations)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
@ -344,3 +350,9 @@ func TestPerform(t *testing.T) {
} }
} }
} }
// fake heartbeater to receive sends from the worker channel.
func fakeHeartbeater(ch <-chan int) {
for range ch {
}
}