2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Update heartbeat to extend lease of active workers

This commit is contained in:
Ken Hibino
2022-02-14 07:17:51 -08:00
parent dfae8638e1
commit d7169cd445
8 changed files with 148 additions and 53 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
)
// heartbeater is responsible for writing process info to redis periodically to
@@ -19,6 +20,7 @@ import (
type heartbeater struct {
logger *log.Logger
broker base.Broker
clock timeutil.Clock
// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}
@@ -69,6 +71,7 @@ func newHeartbeater(params heartbeaterParams) *heartbeater {
return &heartbeater{
logger: params.logger,
broker: params.broker,
clock: timeutil.NewRealClock(),
done: make(chan struct{}),
interval: params.interval,
@@ -100,6 +103,8 @@ type workerInfo struct {
started time.Time
// deadline the worker has to finish processing the task by.
deadline time.Time
// lease the worker holds for the task.
lease *base.Lease
}
func (h *heartbeater) start(wg *sync.WaitGroup) {
@@ -107,7 +112,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
h.started = time.Now()
h.started = h.clock.Now()
h.beat()
@@ -166,7 +171,12 @@ func (h *heartbeater) beat() {
Started: w.started,
Deadline: w.deadline,
})
idsByQueue[w.msg.Queue] = append(idsByQueue[w.msg.Queue], id)
// Check lease before adding to the set to make sure not to extend the lease if the lease is already expired.
if w.lease.IsValid() {
idsByQueue[w.msg.Queue] = append(idsByQueue[w.msg.Queue], id)
} else {
w.lease.NotifyExpiration() // notify processor if the lease is expired
}
}
// Note: Set TTL to be long enough so that it won't expire before we write again
@@ -176,8 +186,15 @@ func (h *heartbeater) beat() {
}
for qname, ids := range idsByQueue {
if err := h.broker.ExtendLease(qname, ids...); err != nil {
expirationTime, err := h.broker.ExtendLease(qname, ids...)
if err != nil {
h.logger.Errorf("could not extend lease for tasks %v: %v", ids, err)
continue
}
for _, id := range ids {
if l := h.workers[id].lease; !l.Reset(expirationTime) {
h.logger.Warnf("Lease reset failed for %s; lease deadline: %v", id, l.Deadline())
}
}
}
}