Update heartbeat goroutine to call ExtendLease on active tasks

This commit is contained in:
Ken Hibino
2022-02-12 09:48:07 -08:00
parent 87dc392c7f
commit 871474f220
6 changed files with 215 additions and 26 deletions

View File

@@ -134,6 +134,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
}()
}
// beat extends lease for workers and writes server/worker info to redis.
func (h *heartbeater) beat() {
h.state.mu.Lock()
srvStatus := h.state.value.String()
@@ -152,6 +153,7 @@ func (h *heartbeater) beat() {
}
var ws []*base.WorkerInfo
idsByQueue := make(map[string][]string)
for id, w := range h.workers {
ws = append(ws, &base.WorkerInfo{
Host: h.host,
@@ -164,6 +166,7 @@ func (h *heartbeater) beat() {
Started: w.started,
Deadline: w.deadline,
})
idsByQueue[w.msg.Queue] = append(idsByQueue[w.msg.Queue], id)
}
// Note: Set TTL to be long enough so that it won't expire before we write again
@@ -171,4 +174,10 @@ func (h *heartbeater) beat() {
if err := h.broker.WriteServerState(&info, ws, h.interval*2); err != nil {
h.logger.Errorf("could not write server state data: %v", err)
}
for qname, ids := range idsByQueue {
if err := h.broker.ExtendLease(qname, ids...); err != nil {
h.logger.Errorf("could not extend lease for tasks %v: %v", ids, err)
}
}
}