mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 13:21:58 +08:00
Add hearbeater
This commit is contained in:
78
heartbeat.go
Normal file
78
heartbeat.go
Normal file
@@ -0,0 +1,78 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// heartbeater is responsible for writing process status to redis periodically to
|
||||
// indicate that the background worker process is up.
|
||||
type heartbeater struct {
|
||||
rdb *rdb.RDB
|
||||
|
||||
mu sync.Mutex
|
||||
ps *base.ProcessStatus
|
||||
|
||||
// channel to communicate back to the long running "heartbeater" goroutine.
|
||||
done chan struct{}
|
||||
|
||||
// interval between heartbeats.
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func newHeartbeater(rdb *rdb.RDB, interval time.Duration, host string, pid int, queues map[string]uint, n int) *heartbeater {
|
||||
ps := &base.ProcessStatus{
|
||||
Concurrency: n,
|
||||
Queues: queues,
|
||||
Host: host,
|
||||
PID: pid,
|
||||
}
|
||||
return &heartbeater{
|
||||
rdb: rdb,
|
||||
ps: ps,
|
||||
done: make(chan struct{}),
|
||||
interval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *heartbeater) terminate() {
|
||||
logger.info("Heartbeater shutting down...")
|
||||
// Signal the heartbeater goroutine to stop.
|
||||
h.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (h *heartbeater) setState(state string) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.ps.State = state
|
||||
}
|
||||
|
||||
func (h *heartbeater) start() {
|
||||
h.ps.Started = time.Now()
|
||||
h.ps.State = "running"
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-h.done:
|
||||
logger.info("Heartbeater done")
|
||||
return
|
||||
case <-time.After(h.interval):
|
||||
// Note: Set TTL to be long enough value so that it won't expire before we write again
|
||||
// and short enough to expire quickly once process is shut down.
|
||||
h.mu.Lock()
|
||||
err := h.rdb.WriteProcessStatus(h.ps, h.interval*2)
|
||||
h.mu.Unlock()
|
||||
if err != nil {
|
||||
logger.error("could not write heartbeat data: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
Reference in New Issue
Block a user