diff --git a/background.go b/background.go index fa533a2..0d0cccf 100644 --- a/background.go +++ b/background.go @@ -34,7 +34,9 @@ type Background struct { mu sync.Mutex running bool - pinfo *base.ProcessInfo + // channel to send state updates. + stateCh chan<- string + rdb *rdb.RDB scheduler *scheduler processor *processor @@ -125,17 +127,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { } pid := os.Getpid() - pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority) rdb := rdb.NewRDB(createRedisClient(r)) syncRequestCh := make(chan *syncRequest) + stateCh := make(chan string) + workerCh := make(chan int) cancelations := base.NewCancelations() syncer := newSyncer(syncRequestCh, 5*time.Second) - heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second) + heartbeater := newHeartbeater(rdb, host, pid, n, queues, cfg.StrictPriority, 5*time.Second, stateCh, workerCh) scheduler := newScheduler(rdb, 5*time.Second, queues) - processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh, cancelations) + processor := newProcessor(rdb, queues, cfg.StrictPriority, n, delayFunc, syncRequestCh, workerCh, cancelations) subscriber := newSubscriber(rdb, cancelations) return &Background{ - pinfo: pinfo, + stateCh: stateCh, rdb: rdb, scheduler: scheduler, processor: processor, @@ -188,7 +191,7 @@ func (bg *Background) Run(handler Handler) { sig := <-sigs if sig == syscall.SIGTSTP { bg.processor.stop() - bg.pinfo.SetState("stopped") + bg.stateCh <- "stopped" continue } break @@ -231,7 +234,6 @@ func (bg *Background) stop() { bg.subscriber.terminate() bg.heartbeater.terminate() - bg.rdb.ClearProcessInfo(bg.pinfo) bg.rdb.Close() bg.processor.handler = nil bg.running = false diff --git a/heartbeat.go b/heartbeat.go index d85bb4d..f8fc326 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -21,15 +21,24 @@ type heartbeater struct { // channel to communicate back to the long running "heartbeater" goroutine. done chan struct{} + // channel to receive updates on process state. + stateCh <-chan string + + // channel to recieve updates on workers count. + workerCh <-chan int + // interval between heartbeats. interval time.Duration } -func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater { +func newHeartbeater(rdb *rdb.RDB, host string, pid, concurrency int, queues map[string]int, strict bool, + interval time.Duration, stateCh <-chan string, workerCh <-chan int) *heartbeater { return &heartbeater{ rdb: rdb, - pinfo: pinfo, + pinfo: base.NewProcessInfo(host, pid, concurrency, queues, strict), done: make(chan struct{}), + stateCh: stateCh, + workerCh: workerCh, interval: interval, } } @@ -41,17 +50,24 @@ func (h *heartbeater) terminate() { } func (h *heartbeater) start() { - h.pinfo.SetStarted(time.Now()) - h.pinfo.SetState("running") + h.pinfo.Started = time.Now() + h.pinfo.State = "running" go func() { h.beat() + timer := time.NewTimer(h.interval) for { select { case <-h.done: + h.rdb.ClearProcessInfo(h.pinfo) logger.info("Heartbeater done") return - case <-time.After(h.interval): + case state := <-h.stateCh: + h.pinfo.State = state + case delta := <-h.workerCh: + h.pinfo.ActiveWorkerCount += delta + case <-timer.C: h.beat() + timer.Reset(h.interval) } } }() diff --git a/heartbeat_test.go b/heartbeat_test.go index 268ea76..ccbd114 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -34,8 +34,9 @@ func TestHeartbeater(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false) - hb := newHeartbeater(rdbClient, pi, tc.interval) + stateCh := make(chan string) + workerCh := make(chan int) + hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh) want := &base.ProcessInfo{ Host: tc.host, @@ -64,7 +65,7 @@ func TestHeartbeater(t *testing.T) { } // state change - pi.SetState("stopped") + stateCh <- "stopped" // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) diff --git a/internal/base/base.go b/internal/base/base.go index 12f01df..d616901 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -89,7 +89,6 @@ type TaskMessage struct { // ProcessInfo holds information about running background worker process. type ProcessInfo struct { - mu sync.Mutex Concurrency int Queues map[string]int StrictPriority bool @@ -111,27 +110,6 @@ func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, st } } -// SetState set the state field of the process info. -func (p *ProcessInfo) SetState(state string) { - p.mu.Lock() - defer p.mu.Unlock() - p.State = state -} - -// SetStarted set the started field of the process info. -func (p *ProcessInfo) SetStarted(t time.Time) { - p.mu.Lock() - defer p.mu.Unlock() - p.Started = t -} - -// IncrActiveWorkerCount increments active worker count by delta. -func (p *ProcessInfo) IncrActiveWorkerCount(delta int) { - p.mu.Lock() - defer p.mu.Unlock() - p.ActiveWorkerCount += delta -} - // Cancelations is a collection that holds cancel functions for all in-progress tasks. // // Its methods are safe to be used in multiple goroutines. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 46050c0..225dcc6 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -5,7 +5,6 @@ package base import ( - "sync" "testing" "time" ) @@ -79,30 +78,3 @@ func TestProcessInfoKey(t *testing.T) { } } } - -// Note: Run this test with -race flag to check for data race. -func TestProcessInfoSetter(t *testing.T) { - pi := NewProcessInfo("localhost", 1234, 8, map[string]int{"default": 1}, false) - - var wg sync.WaitGroup - - wg.Add(3) - - go func() { - pi.SetState("runnning") - wg.Done() - }() - - go func() { - pi.SetStarted(time.Now()) - pi.IncrActiveWorkerCount(1) - wg.Done() - }() - - go func() { - pi.SetState("stopped") - wg.Done() - }() - - wg.Wait() -} diff --git a/processor.go b/processor.go index 6bc4b1c..dfdd6ae 100644 --- a/processor.go +++ b/processor.go @@ -20,8 +20,6 @@ import ( type processor struct { rdb *rdb.RDB - pinfo *base.ProcessInfo - handler Handler queueConfig map[string]int @@ -34,6 +32,9 @@ type processor struct { // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest + // channel to send worker count updates. + workerCh chan<- int + // rate limiter to prevent spamming logs with a bunch of errors. errLogLimiter *rate.Limiter @@ -59,22 +60,23 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest, cancelations *base.Cancelations) *processor { - qcfg := normalizeQueueCfg(pinfo.Queues) +func newProcessor(r *rdb.RDB, queues map[string]int, strict bool, concurrency int, fn retryDelayFunc, + syncRequestCh chan<- *syncRequest, workerCh chan<- int, cancelations *base.Cancelations) *processor { + qcfg := normalizeQueueCfg(queues) orderedQueues := []string(nil) - if pinfo.StrictPriority { + if strict { orderedQueues = sortByPriority(qcfg) } return &processor{ rdb: r, - pinfo: pinfo, queueConfig: qcfg, orderedQueues: orderedQueues, retryDelayFunc: fn, syncRequestCh: syncRequestCh, + workerCh: workerCh, cancelations: cancelations, errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), - sema: make(chan struct{}, pinfo.Concurrency), + sema: make(chan struct{}, concurrency), done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), @@ -162,11 +164,11 @@ func (p *processor) exec() { p.requeue(msg) return case p.sema <- struct{}{}: // acquire token - p.pinfo.IncrActiveWorkerCount(1) + p.workerCh <- 1 go func() { defer func() { + p.workerCh <- -1 <-p.sema /* release token */ - p.pinfo.IncrActiveWorkerCount(-1) }() resCh := make(chan error, 1) diff --git a/processor_test.go b/processor_test.go index 079bf6e..1615571 100644 --- a/processor_test.go +++ b/processor_test.go @@ -66,9 +66,10 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) + workerCh := make(chan int) + go fakeHeartbeater(workerCh) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, pi, defaultDelayFunc, nil, cancelations) + p := newProcessor(rdbClient, defaultQueueConfig, false, 10, defaultDelayFunc, nil, workerCh, cancelations) p.handler = HandlerFunc(handler) p.start() @@ -81,6 +82,7 @@ func TestProcessorSuccess(t *testing.T) { } time.Sleep(tc.wait) p.terminate() + close(workerCh) if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) @@ -151,9 +153,10 @@ func TestProcessorRetry(t *testing.T) { handler := func(ctx context.Context, task *Task) error { return fmt.Errorf(errMsg) } - pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) + workerCh := make(chan int) + go fakeHeartbeater(workerCh) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, pi, delayFunc, nil, cancelations) + p := newProcessor(rdbClient, defaultQueueConfig, false, 10, delayFunc, nil, workerCh, cancelations) p.handler = HandlerFunc(handler) p.start() @@ -166,6 +169,7 @@ func TestProcessorRetry(t *testing.T) { } time.Sleep(tc.wait) p.terminate() + close(workerCh) cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score gotRetry := h.GetRetryEntries(t, r) @@ -212,9 +216,8 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false) cancelations := base.NewCancelations() - p := newProcessor(nil, pi, defaultDelayFunc, nil, cancelations) + p := newProcessor(nil, tc.queueCfg, false, 10, defaultDelayFunc, nil, nil, cancelations) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -280,14 +283,17 @@ func TestProcessorWithStrictPriority(t *testing.T) { "low": 1, } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. - pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */) + workerCh := make(chan int) + go fakeHeartbeater(workerCh) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, pi, defaultDelayFunc, nil, cancelations) + p := newProcessor(rdbClient, queueCfg, true /* strict */, 1, /* concurrency */ + defaultDelayFunc, nil, workerCh, cancelations) p.handler = HandlerFunc(handler) p.start() time.Sleep(tc.wait) p.terminate() + close(workerCh) if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) @@ -344,3 +350,9 @@ func TestPerform(t *testing.T) { } } } + +// fake heartbeater to receive sends from the worker channel. +func fakeHeartbeater(ch <-chan int) { + for range ch { + } +}