diff --git a/heartbeat.go b/heartbeat.go index eeb08b5..348d834 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -38,13 +38,13 @@ type heartbeater struct { // heartbeater goroutine. In other words, confine these variables // to this goroutine only. started time.Time - workers map[string]workerStat + workers map[string]*workerInfo // status is shared with other goroutine but is concurrency safe. status *base.ServerStatus // channels to receive updates on active workers. - starting <-chan *base.TaskMessage + starting <-chan *workerInfo finished <-chan *base.TaskMessage } @@ -56,7 +56,7 @@ type heartbeaterParams struct { queues map[string]int strictPriority bool status *base.ServerStatus - starting <-chan *base.TaskMessage + starting <-chan *workerInfo finished <-chan *base.TaskMessage } @@ -80,7 +80,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater { strictPriority: params.strictPriority, status: params.status, - workers: make(map[string]workerStat), + workers: make(map[string]*workerInfo), starting: params.starting, finished: params.finished, } @@ -92,11 +92,14 @@ func (h *heartbeater) terminate() { h.done <- struct{}{} } -// A workerStat records the message a worker is working on -// and the time the worker has started processing the message. -type workerStat struct { +// A workerInfo holds an active worker information. +type workerInfo struct { + // the task message the worker is processing. + msg *base.TaskMessage + // the time the worker has started processing the message. started time.Time - msg *base.TaskMessage + // deadline the worker has to finish processing the task by. + deadline time.Time } func (h *heartbeater) start(wg *sync.WaitGroup) { @@ -121,8 +124,8 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { h.beat() timer.Reset(h.interval) - case msg := <-h.starting: - h.workers[msg.ID.String()] = workerStat{time.Now(), msg} + case w := <-h.starting: + h.workers[w.msg.ID.String()] = w case msg := <-h.finished: delete(h.workers, msg.ID.String()) @@ -145,16 +148,17 @@ func (h *heartbeater) beat() { } var ws []*base.WorkerInfo - for id, stat := range h.workers { + for id, w := range h.workers { ws = append(ws, &base.WorkerInfo{ Host: h.host, PID: h.pid, ServerID: h.serverID, ID: id, - Type: stat.msg.Type, - Queue: stat.msg.Queue, - Payload: stat.msg.Payload, - Started: stat.started, + Type: w.msg.Type, + Queue: w.msg.Queue, + Payload: w.msg.Payload, + Started: w.started, + Deadline: w.deadline, }) } diff --git a/heartbeat_test.go b/heartbeat_test.go index 16cc9bf..c517519 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -47,7 +47,7 @@ func TestHeartbeater(t *testing.T) { queues: tc.queues, strictPriority: false, status: status, - starting: make(chan *base.TaskMessage), + starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) @@ -139,7 +139,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { queues: map[string]int{"default": 1}, strictPriority: false, status: base.NewServerStatus(base.StatusRunning), - starting: make(chan *base.TaskMessage), + starting: make(chan *workerInfo), finished: make(chan *base.TaskMessage), }) diff --git a/inspector.go b/inspector.go index 1ff029b..3cef876 100644 --- a/inspector.go +++ b/inspector.go @@ -725,7 +725,8 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) { continue } wrkInfo := &WorkerInfo{ - Started: w.Started, + Started: w.Started, + Deadline: w.Deadline, Task: &ActiveTask{ Task: NewTask(w.Type, w.Payload), ID: w.ID, @@ -771,6 +772,8 @@ type WorkerInfo struct { Task *ActiveTask // Time the worker started processing the task. Started time.Time + // Time the worker needs to finish processing the task by. + Deadline time.Time } // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. diff --git a/internal/base/base.go b/internal/base/base.go index 8663b13..2d1bca0 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -283,6 +283,7 @@ type WorkerInfo struct { Queue string Payload map[string]interface{} Started time.Time + Deadline time.Time } // SchedulerEntry holds information about a periodic task registered with a scheduler. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index bd2e89c..7f1d9ce 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -974,7 +974,6 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { continue // skip bad data } workers = append(workers, &w) - } } return workers, nil diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 6283db8..0e278ef 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2966,6 +2966,7 @@ func TestListWorkers(t *testing.T) { Queue: m1.Queue, Payload: m1.Payload, Started: time.Now().Add(-1 * time.Second), + Deadline: time.Now().Add(30 * time.Second), }, { Host: host, @@ -2976,6 +2977,7 @@ func TestListWorkers(t *testing.T) { Queue: m2.Queue, Payload: m2.Payload, Started: time.Now().Add(-5 * time.Second), + Deadline: time.Now().Add(10 * time.Minute), }, { Host: host, @@ -2986,6 +2988,7 @@ func TestListWorkers(t *testing.T) { Queue: m3.Queue, Payload: m3.Payload, Started: time.Now().Add(-30 * time.Second), + Deadline: time.Now().Add(30 * time.Minute), }, }, }, diff --git a/processor.go b/processor.go index 5a949e6..e480650 100644 --- a/processor.go +++ b/processor.go @@ -63,7 +63,7 @@ type processor struct { // cancelations is a set of cancel functions for all active tasks. cancelations *base.Cancelations - starting chan<- *base.TaskMessage + starting chan<- *workerInfo finished chan<- *base.TaskMessage } @@ -78,7 +78,7 @@ type processorParams struct { strictPriority bool errHandler ErrorHandler shutdownTimeout time.Duration - starting chan<- *base.TaskMessage + starting chan<- *workerInfo finished chan<- *base.TaskMessage } @@ -180,7 +180,7 @@ func (p *processor) exec() { return } - p.starting <- msg + p.starting <- &workerInfo{msg, time.Now(), deadline} go func() { defer func() { p.finished <- msg diff --git a/processor_test.go b/processor_test.go index 35a5e44..7475889 100644 --- a/processor_test.go +++ b/processor_test.go @@ -20,7 +20,7 @@ import ( ) // fakeHeartbeater receives from starting and finished channels and do nothing. -func fakeHeartbeater(starting, finished <-chan *base.TaskMessage, done <-chan struct{}) { +func fakeHeartbeater(starting <-chan *workerInfo, finished <-chan *base.TaskMessage, done <-chan struct{}) { for { select { case <-starting: @@ -86,7 +86,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { processed = append(processed, task) return nil } - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) done := make(chan struct{}) @@ -177,7 +177,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { processed = append(processed, task) return nil } - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) done := make(chan struct{}) @@ -258,7 +258,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { processed = append(processed, task) return nil } - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) done := make(chan struct{}) @@ -389,7 +389,7 @@ func TestProcessorRetry(t *testing.T) { defer mu.Unlock() n++ } - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) done := make(chan struct{}) defer func() { close(done) }() @@ -470,7 +470,7 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) done := make(chan struct{}) defer func() { close(done) }() @@ -559,7 +559,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { "critical": 3, "low": 1, } - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) done := make(chan struct{}) diff --git a/server.go b/server.go index 2275b87..859fe5d 100644 --- a/server.go +++ b/server.go @@ -316,7 +316,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger.SetLevel(toInternalLogLevel(loglevel)) rdb := rdb.NewRDB(createRedisClient(r)) - starting := make(chan *base.TaskMessage) + starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) status := base.NewServerStatus(base.StatusIdle)