2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Record deadline within WorkerInfo

This commit is contained in:
Ken Hibino 2021-01-27 15:55:43 -08:00
parent bfde0b6283
commit eba7c4e085
9 changed files with 40 additions and 30 deletions

View File

@ -38,13 +38,13 @@ type heartbeater struct {
// heartbeater goroutine. In other words, confine these variables
// to this goroutine only.
started time.Time
workers map[string]workerStat
workers map[string]*workerInfo
// status is shared with other goroutine but is concurrency safe.
status *base.ServerStatus
// channels to receive updates on active workers.
starting <-chan *base.TaskMessage
starting <-chan *workerInfo
finished <-chan *base.TaskMessage
}
@ -56,7 +56,7 @@ type heartbeaterParams struct {
queues map[string]int
strictPriority bool
status *base.ServerStatus
starting <-chan *base.TaskMessage
starting <-chan *workerInfo
finished <-chan *base.TaskMessage
}
@ -80,7 +80,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
strictPriority: params.strictPriority,
status: params.status,
workers: make(map[string]workerStat),
workers: make(map[string]*workerInfo),
starting: params.starting,
finished: params.finished,
}
@ -92,11 +92,14 @@ func (h *heartbeater) terminate() {
h.done <- struct{}{}
}
// A workerStat records the message a worker is working on
// and the time the worker has started processing the message.
type workerStat struct {
// A workerInfo holds an active worker information.
type workerInfo struct {
// the task message the worker is processing.
msg *base.TaskMessage
// the time the worker has started processing the message.
started time.Time
msg *base.TaskMessage
// deadline the worker has to finish processing the task by.
deadline time.Time
}
func (h *heartbeater) start(wg *sync.WaitGroup) {
@ -121,8 +124,8 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
h.beat()
timer.Reset(h.interval)
case msg := <-h.starting:
h.workers[msg.ID.String()] = workerStat{time.Now(), msg}
case w := <-h.starting:
h.workers[w.msg.ID.String()] = w
case msg := <-h.finished:
delete(h.workers, msg.ID.String())
@ -145,16 +148,17 @@ func (h *heartbeater) beat() {
}
var ws []*base.WorkerInfo
for id, stat := range h.workers {
for id, w := range h.workers {
ws = append(ws, &base.WorkerInfo{
Host: h.host,
PID: h.pid,
ServerID: h.serverID,
ID: id,
Type: stat.msg.Type,
Queue: stat.msg.Queue,
Payload: stat.msg.Payload,
Started: stat.started,
Type: w.msg.Type,
Queue: w.msg.Queue,
Payload: w.msg.Payload,
Started: w.started,
Deadline: w.deadline,
})
}

View File

@ -47,7 +47,7 @@ func TestHeartbeater(t *testing.T) {
queues: tc.queues,
strictPriority: false,
status: status,
starting: make(chan *base.TaskMessage),
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})
@ -139,7 +139,7 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
queues: map[string]int{"default": 1},
strictPriority: false,
status: base.NewServerStatus(base.StatusRunning),
starting: make(chan *base.TaskMessage),
starting: make(chan *workerInfo),
finished: make(chan *base.TaskMessage),
})

View File

@ -725,7 +725,8 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
continue
}
wrkInfo := &WorkerInfo{
Started: w.Started,
Started: w.Started,
Deadline: w.Deadline,
Task: &ActiveTask{
Task: NewTask(w.Type, w.Payload),
ID: w.ID,
@ -771,6 +772,8 @@ type WorkerInfo struct {
Task *ActiveTask
// Time the worker started processing the task.
Started time.Time
// Time the worker needs to finish processing the task by.
Deadline time.Time
}
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.

View File

@ -283,6 +283,7 @@ type WorkerInfo struct {
Queue string
Payload map[string]interface{}
Started time.Time
Deadline time.Time
}
// SchedulerEntry holds information about a periodic task registered with a scheduler.

View File

@ -974,7 +974,6 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
continue // skip bad data
}
workers = append(workers, &w)
}
}
return workers, nil

View File

@ -2966,6 +2966,7 @@ func TestListWorkers(t *testing.T) {
Queue: m1.Queue,
Payload: m1.Payload,
Started: time.Now().Add(-1 * time.Second),
Deadline: time.Now().Add(30 * time.Second),
},
{
Host: host,
@ -2976,6 +2977,7 @@ func TestListWorkers(t *testing.T) {
Queue: m2.Queue,
Payload: m2.Payload,
Started: time.Now().Add(-5 * time.Second),
Deadline: time.Now().Add(10 * time.Minute),
},
{
Host: host,
@ -2986,6 +2988,7 @@ func TestListWorkers(t *testing.T) {
Queue: m3.Queue,
Payload: m3.Payload,
Started: time.Now().Add(-30 * time.Second),
Deadline: time.Now().Add(30 * time.Minute),
},
},
},

View File

@ -63,7 +63,7 @@ type processor struct {
// cancelations is a set of cancel functions for all active tasks.
cancelations *base.Cancelations
starting chan<- *base.TaskMessage
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
}
@ -78,7 +78,7 @@ type processorParams struct {
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
starting chan<- *base.TaskMessage
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
}
@ -180,7 +180,7 @@ func (p *processor) exec() {
return
}
p.starting <- msg
p.starting <- &workerInfo{msg, time.Now(), deadline}
go func() {
defer func() {
p.finished <- msg

View File

@ -20,7 +20,7 @@ import (
)
// fakeHeartbeater receives from starting and finished channels and do nothing.
func fakeHeartbeater(starting, finished <-chan *base.TaskMessage, done <-chan struct{}) {
func fakeHeartbeater(starting <-chan *workerInfo, finished <-chan *base.TaskMessage, done <-chan struct{}) {
for {
select {
case <-starting:
@ -86,7 +86,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@ -177,7 +177,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@ -258,7 +258,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
@ -389,7 +389,7 @@ func TestProcessorRetry(t *testing.T) {
defer mu.Unlock()
n++
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
done := make(chan struct{})
defer func() { close(done) }()
@ -470,7 +470,7 @@ func TestProcessorQueues(t *testing.T) {
}
for _, tc := range tests {
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
done := make(chan struct{})
defer func() { close(done) }()
@ -559,7 +559,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"critical": 3,
"low": 1,
}
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})

View File

@ -316,7 +316,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
logger.SetLevel(toInternalLogLevel(loglevel))
rdb := rdb.NewRDB(createRedisClient(r))
starting := make(chan *base.TaskMessage)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
status := base.NewServerStatus(base.StatusIdle)