diff --git a/background.go b/background.go index e2e27eb..1a39598 100644 --- a/background.go +++ b/background.go @@ -34,8 +34,7 @@ type Background struct { mu sync.Mutex running bool - // channel to send state updates. - stateCh chan<- string + ps *base.ProcessState // wait group to wait for all goroutines to finish. wg sync.WaitGroup @@ -131,18 +130,17 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { pid := os.Getpid() rdb := rdb.NewRDB(createRedisClient(r)) - syncRequestCh := make(chan *syncRequest) - stateCh := make(chan string) - workerCh := make(chan int) - cancelations := base.NewCancelations() - syncer := newSyncer(syncRequestCh, 5*time.Second) - heartbeater := newHeartbeater(rdb, host, pid, n, queues, cfg.StrictPriority, 5*time.Second, stateCh, workerCh) + ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority) + syncCh := make(chan *syncRequest) + cancels := base.NewCancelations() + syncer := newSyncer(syncCh, 5*time.Second) + heartbeater := newHeartbeater(rdb, ps, 5*time.Second) scheduler := newScheduler(rdb, 5*time.Second, queues) - processor := newProcessor(rdb, queues, cfg.StrictPriority, n, delayFunc, syncRequestCh, workerCh, cancelations) - subscriber := newSubscriber(rdb, cancelations) + processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels) + subscriber := newSubscriber(rdb, cancels) return &Background{ - stateCh: stateCh, rdb: rdb, + ps: ps, scheduler: scheduler, processor: processor, syncer: syncer, @@ -194,7 +192,7 @@ func (bg *Background) Run(handler Handler) { sig := <-sigs if sig == syscall.SIGTSTP { bg.processor.stop() - bg.stateCh <- "stopped" + bg.ps.SetStatus(base.StatusStopped) continue } break @@ -232,8 +230,7 @@ func (bg *Background) stop() { // Note: The order of termination is important. // Sender goroutines should be terminated before the receiver goroutines. // - // processor -> syncer (via syncRequestCh) - // processor -> heartbeater (via workerCh) + // processor -> syncer (via syncCh) bg.scheduler.terminate() bg.processor.terminate() bg.syncer.terminate() diff --git a/heartbeat.go b/heartbeat.go index 9c78b74..4e13faf 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -17,29 +17,20 @@ import ( type heartbeater struct { rdb *rdb.RDB - pinfo *base.ProcessInfo + ps *base.ProcessState // channel to communicate back to the long running "heartbeater" goroutine. done chan struct{} - // channel to receive updates on process state. - stateCh <-chan string - - // channel to recieve updates on workers count. - workerCh <-chan int - // interval between heartbeats. interval time.Duration } -func newHeartbeater(rdb *rdb.RDB, host string, pid, concurrency int, queues map[string]int, strict bool, - interval time.Duration, stateCh <-chan string, workerCh <-chan int) *heartbeater { +func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { return &heartbeater{ rdb: rdb, - pinfo: base.NewProcessInfo(host, pid, concurrency, queues, strict), + ps: ps, done: make(chan struct{}), - stateCh: stateCh, - workerCh: workerCh, interval: interval, } } @@ -51,26 +42,20 @@ func (h *heartbeater) terminate() { } func (h *heartbeater) start(wg *sync.WaitGroup) { - h.pinfo.Started = time.Now() - h.pinfo.State = "running" + h.ps.SetStarted(time.Now()) + h.ps.SetStatus(base.StatusRunning) wg.Add(1) go func() { defer wg.Done() h.beat() - timer := time.NewTimer(h.interval) for { select { case <-h.done: - h.rdb.ClearProcessInfo(h.pinfo) + h.rdb.ClearProcessInfo(h.ps.Get()) logger.info("Heartbeater done") return - case state := <-h.stateCh: - h.pinfo.State = state - case delta := <-h.workerCh: - h.pinfo.ActiveWorkerCount += delta - case <-timer.C: + case <-time.After(h.interval): h.beat() - timer.Reset(h.interval) } } }() @@ -79,7 +64,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { func (h *heartbeater) beat() { // Note: Set TTL to be long enough so that it won't expire before we write again // and short enough to expire quickly once the process is shut down or killed. - err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2) + err := h.rdb.WriteProcessInfo(h.ps.Get(), h.interval*2) if err != nil { logger.error("could not write heartbeat data: %v", err) } diff --git a/heartbeat_test.go b/heartbeat_test.go index ef5defc..c7b1089 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -35,9 +35,8 @@ func TestHeartbeater(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - stateCh := make(chan string) - workerCh := make(chan int) - hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh) + state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false) + hb := newHeartbeater(rdbClient, state, tc.interval) var wg sync.WaitGroup hb.start(&wg) @@ -48,7 +47,7 @@ func TestHeartbeater(t *testing.T) { Queues: tc.queues, Concurrency: tc.concurrency, Started: time.Now(), - State: "running", + Status: "running", } // allow for heartbeater to write to redis @@ -73,13 +72,13 @@ func TestHeartbeater(t *testing.T) { continue } - // state change - stateCh <- "stopped" + // status change + state.SetStatus(base.StatusStopped) // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) - want.State = "stopped" + want.Status = "stopped" ps, err = rdbClient.ListProcesses() if err != nil { t.Errorf("could not read process status from redis: %v", err) diff --git a/internal/base/base.go b/internal/base/base.go index d616901..8c04c84 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -87,6 +87,105 @@ type TaskMessage struct { Timeout string } +// ProcessState holds process level information. +// +// ProcessStates are safe for concurrent use by multiple goroutines. +type ProcessState struct { + mu sync.Mutex // guards all data fields + concurrency int + queues map[string]int + strictPriority bool + pid int + host string + status PStatus + started time.Time + activeWorkerCount int +} + +// PStatus represents status of a process. +type PStatus int + +const ( + // StatusIdle indicates process is in idle state. + StatusIdle PStatus = iota + + // StatusRunning indicates process is up and processing tasks. + StatusRunning + + // StatusStopped indicates process is up but not processing new tasks. + StatusStopped +) + +var statuses = []string{ + "idle", + "running", + "stopped", +} + +func (s PStatus) String() string { + if StatusIdle <= s && s <= StatusStopped { + return statuses[s] + } + return "unknown status" +} + +// NewProcessState returns a new instance of ProcessState. +func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState { + return &ProcessState{ + host: host, + pid: pid, + concurrency: concurrency, + queues: cloneQueueConfig(queues), + strictPriority: strict, + status: StatusIdle, + } +} + +// SetStatus updates the state of process. +func (ps *ProcessState) SetStatus(status PStatus) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.status = status +} + +// SetStarted records when the process started processing. +func (ps *ProcessState) SetStarted(t time.Time) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.started = t +} + +// IncrWorkerCount increments the worker count by delta. +func (ps *ProcessState) IncrWorkerCount(delta int) { + ps.mu.Lock() + defer ps.mu.Unlock() + ps.activeWorkerCount += delta +} + +// Get returns current state of process as a ProcessInfo. +func (ps *ProcessState) Get() *ProcessInfo { + ps.mu.Lock() + defer ps.mu.Unlock() + return &ProcessInfo{ + Host: ps.host, + PID: ps.pid, + Concurrency: ps.concurrency, + Queues: cloneQueueConfig(ps.queues), + StrictPriority: ps.strictPriority, + Status: ps.status.String(), + Started: ps.started, + ActiveWorkerCount: ps.activeWorkerCount, + } +} + +func cloneQueueConfig(qcfg map[string]int) map[string]int { + res := make(map[string]int) + for qname, n := range qcfg { + res[qname] = n + } + return res +} + // ProcessInfo holds information about running background worker process. type ProcessInfo struct { Concurrency int @@ -94,25 +193,14 @@ type ProcessInfo struct { StrictPriority bool PID int Host string - State string + Status string Started time.Time ActiveWorkerCount int } -// NewProcessInfo returns a new instance of ProcessInfo. -func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo { - return &ProcessInfo{ - Host: host, - PID: pid, - Concurrency: concurrency, - Queues: queues, - StrictPriority: strict, - } -} - // Cancelations is a collection that holds cancel functions for all in-progress tasks. // -// Its methods are safe to be used in multiple goroutines. +// Cancelations are safe for concurrent use by multipel goroutines. type Cancelations struct { mu sync.Mutex cancelFuncs map[string]context.CancelFunc diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index cd671c2..8f5292f 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2059,7 +2059,7 @@ func TestListProcesses(t *testing.T) { Queues: map[string]int{"default": 1}, Host: "do.droplet1", PID: 1234, - State: "running", + Status: "running", Started: time.Now().Add(-time.Hour), ActiveWorkerCount: 5, } @@ -2069,7 +2069,7 @@ func TestListProcesses(t *testing.T) { Queues: map[string]int{"email": 1}, Host: "do.droplet2", PID: 9876, - State: "stopped", + Status: "stopped", Started: time.Now().Add(-2 * time.Hour), ActiveWorkerCount: 20, } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index accc52d..6dcdc93 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -748,7 +748,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, PID: 98765, Host: "localhost", - State: "running", + Status: "running", Started: time.Now(), ActiveWorkerCount: 1, } diff --git a/processor.go b/processor.go index edf8de6..b5248c8 100644 --- a/processor.go +++ b/processor.go @@ -20,6 +20,8 @@ import ( type processor struct { rdb *rdb.RDB + ps *base.ProcessState + handler Handler queueConfig map[string]int @@ -32,9 +34,6 @@ type processor struct { // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest - // channel to send worker count updates. - workerCh chan<- int - // rate limiter to prevent spamming logs with a bunch of errors. errLogLimiter *rate.Limiter @@ -60,23 +59,23 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(r *rdb.RDB, queues map[string]int, strict bool, concurrency int, fn retryDelayFunc, - syncRequestCh chan<- *syncRequest, workerCh chan<- int, cancelations *base.Cancelations) *processor { - qcfg := normalizeQueueCfg(queues) +func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor { + info := ps.Get() + qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) - if strict { + if info.StrictPriority { orderedQueues = sortByPriority(qcfg) } return &processor{ rdb: r, + ps: ps, queueConfig: qcfg, orderedQueues: orderedQueues, retryDelayFunc: fn, - syncRequestCh: syncRequestCh, - workerCh: workerCh, - cancelations: cancelations, + syncRequestCh: syncCh, + cancelations: c, errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), - sema: make(chan struct{}, concurrency), + sema: make(chan struct{}, info.Concurrency), done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), @@ -166,10 +165,10 @@ func (p *processor) exec() { p.requeue(msg) return case p.sema <- struct{}{}: // acquire token - p.workerCh <- 1 + p.ps.IncrWorkerCount(1) go func() { defer func() { - p.workerCh <- -1 + p.ps.IncrWorkerCount(-1) <-p.sema /* release token */ }() diff --git a/processor_test.go b/processor_test.go index 77224c0..177547b 100644 --- a/processor_test.go +++ b/processor_test.go @@ -68,8 +68,9 @@ func TestProcessorSuccess(t *testing.T) { } workerCh := make(chan int) go fakeHeartbeater(workerCh) + ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, defaultQueueConfig, false, 10, defaultDelayFunc, nil, workerCh, cancelations) + p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -156,8 +157,9 @@ func TestProcessorRetry(t *testing.T) { } workerCh := make(chan int) go fakeHeartbeater(workerCh) + ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, defaultQueueConfig, false, 10, delayFunc, nil, workerCh, cancelations) + p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -219,7 +221,8 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { cancelations := base.NewCancelations() - p := newProcessor(nil, tc.queueCfg, false, 10, defaultDelayFunc, nil, nil, cancelations) + ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false) + p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -288,8 +291,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { workerCh := make(chan int) go fakeHeartbeater(workerCh) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, queueCfg, true /* strict */, 1, /* concurrency */ - defaultDelayFunc, nil, workerCh, cancelations) + ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) + p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p.handler = HandlerFunc(handler) var wg sync.WaitGroup diff --git a/tools/asynqmon/cmd/ps.go b/tools/asynqmon/cmd/ps.go index 7761886..c4c3867 100644 --- a/tools/asynqmon/cmd/ps.go +++ b/tools/asynqmon/cmd/ps.go @@ -73,7 +73,7 @@ func ps(cmd *cobra.Command, args []string) { printRows := func(w io.Writer, tmpl string) { for _, ps := range processes { fmt.Fprintf(w, tmpl, - ps.Host, ps.PID, ps.State, + ps.Host, ps.PID, ps.Status, fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), formatQueues(ps.Queues), timeAgo(ps.Started)) }