mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Add healthchecker to check broker connection
This commit is contained in:
parent
6978e93080
commit
a913e6d73f
80
healthcheck.go
Normal file
80
healthcheck.go
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// healthchecker is responsible for pinging broker periodically
|
||||||
|
// and call user provided HeathCheckFunc with the ping result.
|
||||||
|
type healthchecker struct {
|
||||||
|
logger *log.Logger
|
||||||
|
broker base.Broker
|
||||||
|
|
||||||
|
// channel to communicate back to the long running "healthchecker" goroutine.
|
||||||
|
done chan struct{}
|
||||||
|
|
||||||
|
// interval between healthchecks.
|
||||||
|
interval time.Duration
|
||||||
|
|
||||||
|
// function to call periodically.
|
||||||
|
healthcheckFunc func(error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type healthcheckerParams struct {
|
||||||
|
logger *log.Logger
|
||||||
|
broker base.Broker
|
||||||
|
interval time.Duration
|
||||||
|
healthcheckFunc func(error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHealthChecker(params healthcheckerParams) *healthchecker {
|
||||||
|
return &healthchecker{
|
||||||
|
logger: params.logger,
|
||||||
|
broker: params.broker,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
interval: params.interval,
|
||||||
|
healthcheckFunc: params.healthcheckFunc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *healthchecker) terminate() {
|
||||||
|
if hc.healthcheckFunc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hc.logger.Debug("Healthchecker shutting down...")
|
||||||
|
// Signal the healthchecker goroutine to stop.
|
||||||
|
hc.done <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hc *healthchecker) start(wg *sync.WaitGroup) {
|
||||||
|
if hc.healthcheckFunc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
timer := time.NewTimer(hc.interval)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-hc.done:
|
||||||
|
hc.logger.Debug("Healthchecker done")
|
||||||
|
timer.Stop()
|
||||||
|
return
|
||||||
|
case <-timer.C:
|
||||||
|
err := hc.broker.Ping()
|
||||||
|
hc.healthcheckFunc(err)
|
||||||
|
timer.Reset(hc.interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
101
healthcheck_test.go
Normal file
101
healthcheck_test.go
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/hibiken/asynq/internal/testbroker"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHealthChecker(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
rdbClient := rdb.NewRDB(r)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// mu guards called and e variables.
|
||||||
|
mu sync.Mutex
|
||||||
|
called int
|
||||||
|
e error
|
||||||
|
)
|
||||||
|
checkFn := func(err error) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
called++
|
||||||
|
e = err
|
||||||
|
}
|
||||||
|
|
||||||
|
hc := newHealthChecker(healthcheckerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: rdbClient,
|
||||||
|
interval: 1 * time.Second,
|
||||||
|
healthcheckFunc: checkFn,
|
||||||
|
})
|
||||||
|
|
||||||
|
hc.start(&sync.WaitGroup{})
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
if called == 0 {
|
||||||
|
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
|
||||||
|
}
|
||||||
|
if e != nil {
|
||||||
|
t.Errorf("HealthCheckFunc was called with non-nil error: %v", e)
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
hc.terminate()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHealthCheckerWhenRedisDown(t *testing.T) {
|
||||||
|
// Make sure that healthchecker goroutine doesn't panic
|
||||||
|
// if it cannot connect to redis.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Errorf("panic occurred: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
r := rdb.NewRDB(setup(t))
|
||||||
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
|
var (
|
||||||
|
// mu guards called and e variables.
|
||||||
|
mu sync.Mutex
|
||||||
|
called int
|
||||||
|
e error
|
||||||
|
)
|
||||||
|
checkFn := func(err error) {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
called++
|
||||||
|
e = err
|
||||||
|
}
|
||||||
|
|
||||||
|
hc := newHealthChecker(healthcheckerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: testBroker,
|
||||||
|
interval: 1 * time.Second,
|
||||||
|
healthcheckFunc: checkFn,
|
||||||
|
})
|
||||||
|
|
||||||
|
testBroker.Sleep()
|
||||||
|
hc.start(&sync.WaitGroup{})
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
if called == 0 {
|
||||||
|
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
|
||||||
|
}
|
||||||
|
if e == nil {
|
||||||
|
t.Errorf("HealthCheckFunc was called with nil; want non-nil error")
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
hc.terminate()
|
||||||
|
}
|
@ -263,6 +263,7 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
|
|||||||
//
|
//
|
||||||
// See rdb.RDB as a reference implementation.
|
// See rdb.RDB as a reference implementation.
|
||||||
type Broker interface {
|
type Broker interface {
|
||||||
|
Ping() error
|
||||||
Enqueue(msg *TaskMessage) error
|
Enqueue(msg *TaskMessage) error
|
||||||
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
||||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
||||||
|
@ -45,6 +45,11 @@ func (r *RDB) Close() error {
|
|||||||
return r.client.Close()
|
return r.client.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping checks the connection with redis server.
|
||||||
|
func (r *RDB) Ping() error {
|
||||||
|
return r.client.Ping().Err()
|
||||||
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:queues:<qname>
|
// KEYS[1] -> asynq:queues:<qname>
|
||||||
// KEYS[2] -> asynq:queues
|
// KEYS[2] -> asynq:queues
|
||||||
// ARGV[1] -> task message data
|
// ARGV[1] -> task message data
|
||||||
|
@ -180,6 +180,15 @@ func (tb *TestBroker) PublishCancelation(id string) error {
|
|||||||
return tb.real.PublishCancelation(id)
|
return tb.real.PublishCancelation(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tb *TestBroker) Ping() error {
|
||||||
|
tb.mu.Lock()
|
||||||
|
defer tb.mu.Unlock()
|
||||||
|
if tb.sleeping {
|
||||||
|
return errRedisDown
|
||||||
|
}
|
||||||
|
return tb.real.Ping()
|
||||||
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) Close() error {
|
func (tb *TestBroker) Close() error {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
|
29
server.go
29
server.go
@ -47,6 +47,7 @@ type Server struct {
|
|||||||
heartbeater *heartbeater
|
heartbeater *heartbeater
|
||||||
subscriber *subscriber
|
subscriber *subscriber
|
||||||
recoverer *recoverer
|
recoverer *recoverer
|
||||||
|
healthchecker *healthchecker
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config specifies the server's background-task processing behavior.
|
// Config specifies the server's background-task processing behavior.
|
||||||
@ -123,6 +124,15 @@ type Config struct {
|
|||||||
//
|
//
|
||||||
// If unset or zero, default timeout of 8 seconds is used.
|
// If unset or zero, default timeout of 8 seconds is used.
|
||||||
ShutdownTimeout time.Duration
|
ShutdownTimeout time.Duration
|
||||||
|
|
||||||
|
// HealthCheckFunc is called periodically with any errors encountered during ping to the
|
||||||
|
// connected redis server.
|
||||||
|
HealthCheckFunc func(error)
|
||||||
|
|
||||||
|
// HealthCheckInterval specifies the interval between healthchecks.
|
||||||
|
//
|
||||||
|
// If unset or zero, the interval is set to 15 seconds.
|
||||||
|
HealthCheckInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// An ErrorHandler handles an error occured during task processing.
|
// An ErrorHandler handles an error occured during task processing.
|
||||||
@ -250,7 +260,11 @@ var defaultQueueConfig = map[string]int{
|
|||||||
base.DefaultQueueName: 1,
|
base.DefaultQueueName: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultShutdownTimeout = 8 * time.Second
|
const (
|
||||||
|
defaultShutdownTimeout = 8 * time.Second
|
||||||
|
|
||||||
|
defaultHealthCheckInterval = 15 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
// NewServer returns a new Server given a redis connection option
|
// NewServer returns a new Server given a redis connection option
|
||||||
// and background processing configuration.
|
// and background processing configuration.
|
||||||
@ -276,6 +290,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
if shutdownTimeout == 0 {
|
if shutdownTimeout == 0 {
|
||||||
shutdownTimeout = defaultShutdownTimeout
|
shutdownTimeout = defaultShutdownTimeout
|
||||||
}
|
}
|
||||||
|
healthcheckInterval := cfg.HealthCheckInterval
|
||||||
|
if healthcheckInterval == 0 {
|
||||||
|
healthcheckInterval = defaultHealthCheckInterval
|
||||||
|
}
|
||||||
logger := log.NewLogger(cfg.Logger)
|
logger := log.NewLogger(cfg.Logger)
|
||||||
loglevel := cfg.LogLevel
|
loglevel := cfg.LogLevel
|
||||||
if loglevel == level_unspecified {
|
if loglevel == level_unspecified {
|
||||||
@ -336,6 +354,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
retryDelayFunc: delayFunc,
|
retryDelayFunc: delayFunc,
|
||||||
interval: 1 * time.Minute,
|
interval: 1 * time.Minute,
|
||||||
})
|
})
|
||||||
|
healthchecker := newHealthChecker(healthcheckerParams{
|
||||||
|
logger: logger,
|
||||||
|
broker: rdb,
|
||||||
|
interval: healthcheckInterval,
|
||||||
|
healthcheckFunc: cfg.HealthCheckFunc,
|
||||||
|
})
|
||||||
return &Server{
|
return &Server{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
broker: rdb,
|
broker: rdb,
|
||||||
@ -346,6 +370,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
heartbeater: heartbeater,
|
heartbeater: heartbeater,
|
||||||
subscriber: subscriber,
|
subscriber: subscriber,
|
||||||
recoverer: recoverer,
|
recoverer: recoverer,
|
||||||
|
healthchecker: healthchecker,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,6 +438,7 @@ func (srv *Server) Start(handler Handler) error {
|
|||||||
srv.logger.Info("Starting processing")
|
srv.logger.Info("Starting processing")
|
||||||
|
|
||||||
srv.heartbeater.start(&srv.wg)
|
srv.heartbeater.start(&srv.wg)
|
||||||
|
srv.healthchecker.start(&srv.wg)
|
||||||
srv.subscriber.start(&srv.wg)
|
srv.subscriber.start(&srv.wg)
|
||||||
srv.syncer.start(&srv.wg)
|
srv.syncer.start(&srv.wg)
|
||||||
srv.recoverer.start(&srv.wg)
|
srv.recoverer.start(&srv.wg)
|
||||||
@ -442,6 +468,7 @@ func (srv *Server) Stop() {
|
|||||||
srv.recoverer.terminate()
|
srv.recoverer.terminate()
|
||||||
srv.syncer.terminate()
|
srv.syncer.terminate()
|
||||||
srv.subscriber.terminate()
|
srv.subscriber.terminate()
|
||||||
|
srv.healthchecker.terminate()
|
||||||
srv.heartbeater.terminate()
|
srv.heartbeater.terminate()
|
||||||
|
|
||||||
srv.wg.Wait()
|
srv.wg.Wait()
|
||||||
|
Loading…
Reference in New Issue
Block a user