2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-09 19:11:58 +08:00
asynq/healthcheck.go
Ken Hibino 9c95c41651 Change Server API
* Rename ServerStatus to ServerState internally

* Rename terminate to shutdown internally

* Update Scheduler API to match Server API
2021-06-29 16:34:21 -07:00

81 lines
1.7 KiB
Go

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
// healthchecker is responsible for pinging broker periodically
// and call user provided HeathCheckFunc with the ping result.
type healthchecker struct {
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "healthchecker" goroutine.
done chan struct{}
// interval between healthchecks.
interval time.Duration
// function to call periodically.
healthcheckFunc func(error)
}
type healthcheckerParams struct {
logger *log.Logger
broker base.Broker
interval time.Duration
healthcheckFunc func(error)
}
func newHealthChecker(params healthcheckerParams) *healthchecker {
return &healthchecker{
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
interval: params.interval,
healthcheckFunc: params.healthcheckFunc,
}
}
func (hc *healthchecker) shutdown() {
if hc.healthcheckFunc == nil {
return
}
hc.logger.Debug("Healthchecker shutting down...")
// Signal the healthchecker goroutine to stop.
hc.done <- struct{}{}
}
func (hc *healthchecker) start(wg *sync.WaitGroup) {
if hc.healthcheckFunc == nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()
timer := time.NewTimer(hc.interval)
for {
select {
case <-hc.done:
hc.logger.Debug("Healthchecker done")
timer.Stop()
return
case <-timer.C:
err := hc.broker.Ping()
hc.healthcheckFunc(err)
timer.Reset(hc.interval)
}
}
}()
}