2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00
asynq/heartbeat.go

201 lines
5.1 KiB
Go
Raw Permalink Normal View History

2020-01-31 22:48:58 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
2020-05-19 11:47:35 +08:00
"os"
2020-02-16 15:14:30 +08:00
"sync"
2020-01-31 22:48:58 +08:00
"time"
"github.com/google/uuid"
2020-01-31 22:48:58 +08:00
"github.com/hibiken/asynq/internal/base"
2020-05-06 13:10:11 +08:00
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
2020-01-31 22:48:58 +08:00
)
2020-02-02 14:22:48 +08:00
// heartbeater is responsible for writing process info to redis periodically to
2020-01-31 22:48:58 +08:00
// indicate that the background worker process is up.
type heartbeater struct {
2020-05-06 13:10:11 +08:00
logger *log.Logger
2020-04-18 22:55:10 +08:00
broker base.Broker
clock timeutil.Clock
2020-01-31 22:48:58 +08:00
// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}
// interval between heartbeats.
interval time.Duration
2020-05-19 11:47:35 +08:00
// following fields are initialized at construction time and are immutable.
host string
pid int
serverID string
concurrency int
queues map[string]int
strictPriority bool
// following fields are mutable and should be accessed only by the
// heartbeater goroutine. In other words, confine these variables
// to this goroutine only.
started time.Time
2021-01-28 07:55:43 +08:00
workers map[string]*workerInfo
2020-05-19 11:47:35 +08:00
// state is shared with other goroutine but is concurrency safe.
state *serverState
2020-05-19 11:47:35 +08:00
// channels to receive updates on active workers.
2021-01-28 07:55:43 +08:00
starting <-chan *workerInfo
2020-05-19 11:47:35 +08:00
finished <-chan *base.TaskMessage
2020-01-31 22:48:58 +08:00
}
type heartbeaterParams struct {
2020-05-19 11:47:35 +08:00
logger *log.Logger
broker base.Broker
interval time.Duration
concurrency int
queues map[string]int
strictPriority bool
state *serverState
2021-01-28 07:55:43 +08:00
starting <-chan *workerInfo
2020-05-19 11:47:35 +08:00
finished <-chan *base.TaskMessage
}
func newHeartbeater(params heartbeaterParams) *heartbeater {
2020-05-19 11:47:35 +08:00
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
2020-01-31 22:48:58 +08:00
return &heartbeater{
logger: params.logger,
broker: params.broker,
clock: timeutil.NewRealClock(),
2020-01-31 22:48:58 +08:00
done: make(chan struct{}),
interval: params.interval,
2020-05-19 11:47:35 +08:00
host: host,
pid: os.Getpid(),
serverID: uuid.New().String(),
2020-05-19 11:47:35 +08:00
concurrency: params.concurrency,
queues: params.queues,
strictPriority: params.strictPriority,
state: params.state,
2021-01-28 07:55:43 +08:00
workers: make(map[string]*workerInfo),
2020-05-19 11:47:35 +08:00
starting: params.starting,
finished: params.finished,
2020-01-31 22:48:58 +08:00
}
}
func (h *heartbeater) shutdown() {
h.logger.Debug("Heartbeater shutting down...")
2020-01-31 22:48:58 +08:00
// Signal the heartbeater goroutine to stop.
h.done <- struct{}{}
}
2021-01-28 07:55:43 +08:00
// A workerInfo holds an active worker information.
type workerInfo struct {
// the task message the worker is processing.
msg *base.TaskMessage
// the time the worker has started processing the message.
2020-05-19 11:47:35 +08:00
started time.Time
2021-01-28 07:55:43 +08:00
// deadline the worker has to finish processing the task by.
deadline time.Time
// lease the worker holds for the task.
lease *base.Lease
2020-05-19 11:47:35 +08:00
}
2020-02-16 15:14:30 +08:00
func (h *heartbeater) start(wg *sync.WaitGroup) {
wg.Add(1)
2020-01-31 22:48:58 +08:00
go func() {
2020-02-16 15:14:30 +08:00
defer wg.Done()
2020-05-19 11:47:35 +08:00
h.started = h.clock.Now()
2020-05-19 11:47:35 +08:00
2020-02-02 14:22:48 +08:00
h.beat()
2020-05-19 11:47:35 +08:00
timer := time.NewTimer(h.interval)
2020-01-31 22:48:58 +08:00
for {
select {
case <-h.done:
2020-05-19 11:47:35 +08:00
h.broker.ClearServerState(h.host, h.pid, h.serverID)
h.logger.Debug("Heartbeater done")
2020-05-19 11:47:35 +08:00
timer.Stop()
2020-01-31 22:48:58 +08:00
return
2020-05-19 11:47:35 +08:00
case <-timer.C:
2020-02-02 14:22:48 +08:00
h.beat()
2020-05-19 11:47:35 +08:00
timer.Reset(h.interval)
2021-01-28 07:55:43 +08:00
case w := <-h.starting:
h.workers[w.msg.ID] = w
2020-05-19 11:47:35 +08:00
case msg := <-h.finished:
delete(h.workers, msg.ID)
2020-01-31 22:48:58 +08:00
}
}
}()
}
2020-02-02 14:22:48 +08:00
// beat extends lease for workers and writes server/worker info to redis.
2020-02-02 14:22:48 +08:00
func (h *heartbeater) beat() {
h.state.mu.Lock()
srvStatus := h.state.value.String()
h.state.mu.Unlock()
2020-05-19 11:47:35 +08:00
info := base.ServerInfo{
Host: h.host,
PID: h.pid,
ServerID: h.serverID,
Concurrency: h.concurrency,
Queues: h.queues,
StrictPriority: h.strictPriority,
Status: srvStatus,
2020-05-19 11:47:35 +08:00
Started: h.started,
ActiveWorkerCount: len(h.workers),
}
var ws []*base.WorkerInfo
idsByQueue := make(map[string][]string)
2021-01-28 07:55:43 +08:00
for id, w := range h.workers {
2020-05-19 11:47:35 +08:00
ws = append(ws, &base.WorkerInfo{
Host: h.host,
PID: h.pid,
ServerID: h.serverID,
ID: id,
2021-01-28 07:55:43 +08:00
Type: w.msg.Type,
Queue: w.msg.Queue,
Payload: w.msg.Payload,
Started: w.started,
Deadline: w.deadline,
2020-05-19 11:47:35 +08:00
})
// Check lease before adding to the set to make sure not to extend the lease if the lease is already expired.
if w.lease.IsValid() {
idsByQueue[w.msg.Queue] = append(idsByQueue[w.msg.Queue], id)
} else {
w.lease.NotifyExpiration() // notify processor if the lease is expired
}
2020-05-19 11:47:35 +08:00
}
2020-02-02 14:22:48 +08:00
// Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed.
2020-05-19 11:47:35 +08:00
if err := h.broker.WriteServerState(&info, ws, h.interval*2); err != nil {
2022-02-17 22:21:26 +08:00
h.logger.Errorf("Failed to write server state data: %v", err)
2020-02-02 14:22:48 +08:00
}
for qname, ids := range idsByQueue {
expirationTime, err := h.broker.ExtendLease(qname, ids...)
if err != nil {
2022-02-17 22:21:26 +08:00
h.logger.Errorf("Failed to extend lease for tasks %v: %v", ids, err)
continue
}
for _, id := range ids {
if l := h.workers[id].lease; !l.Reset(expirationTime) {
h.logger.Warnf("Lease reset failed for %s; lease deadline: %v", id, l.Deadline())
}
}
}
2020-02-02 14:22:48 +08:00
}