diff --git a/healthcheck.go b/healthcheck.go new file mode 100644 index 0000000..84526a0 --- /dev/null +++ b/healthcheck.go @@ -0,0 +1,80 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "sync" + "time" + + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" +) + +// healthchecker is responsible for pinging broker periodically +// and call user provided HeathCheckFunc with the ping result. +type healthchecker struct { + logger *log.Logger + broker base.Broker + + // channel to communicate back to the long running "healthchecker" goroutine. + done chan struct{} + + // interval between healthchecks. + interval time.Duration + + // function to call periodically. + healthcheckFunc func(error) +} + +type healthcheckerParams struct { + logger *log.Logger + broker base.Broker + interval time.Duration + healthcheckFunc func(error) +} + +func newHealthChecker(params healthcheckerParams) *healthchecker { + return &healthchecker{ + logger: params.logger, + broker: params.broker, + done: make(chan struct{}), + interval: params.interval, + healthcheckFunc: params.healthcheckFunc, + } +} + +func (hc *healthchecker) terminate() { + if hc.healthcheckFunc == nil { + return + } + + hc.logger.Debug("Healthchecker shutting down...") + // Signal the healthchecker goroutine to stop. + hc.done <- struct{}{} +} + +func (hc *healthchecker) start(wg *sync.WaitGroup) { + if hc.healthcheckFunc == nil { + return + } + + wg.Add(1) + go func() { + defer wg.Done() + timer := time.NewTimer(hc.interval) + for { + select { + case <-hc.done: + hc.logger.Debug("Healthchecker done") + timer.Stop() + return + case <-timer.C: + err := hc.broker.Ping() + hc.healthcheckFunc(err) + timer.Reset(hc.interval) + } + } + }() +} diff --git a/healthcheck_test.go b/healthcheck_test.go new file mode 100644 index 0000000..c077271 --- /dev/null +++ b/healthcheck_test.go @@ -0,0 +1,101 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "sync" + "testing" + "time" + + "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/testbroker" +) + +func TestHealthChecker(t *testing.T) { + r := setup(t) + rdbClient := rdb.NewRDB(r) + + var ( + // mu guards called and e variables. + mu sync.Mutex + called int + e error + ) + checkFn := func(err error) { + mu.Lock() + defer mu.Unlock() + called++ + e = err + } + + hc := newHealthChecker(healthcheckerParams{ + logger: testLogger, + broker: rdbClient, + interval: 1 * time.Second, + healthcheckFunc: checkFn, + }) + + hc.start(&sync.WaitGroup{}) + + time.Sleep(2 * time.Second) + + mu.Lock() + if called == 0 { + t.Errorf("Healthchecker did not call the provided HealthCheckFunc") + } + if e != nil { + t.Errorf("HealthCheckFunc was called with non-nil error: %v", e) + } + mu.Unlock() + + hc.terminate() +} + +func TestHealthCheckerWhenRedisDown(t *testing.T) { + // Make sure that healthchecker goroutine doesn't panic + // if it cannot connect to redis. + defer func() { + if r := recover(); r != nil { + t.Errorf("panic occurred: %v", r) + } + }() + r := rdb.NewRDB(setup(t)) + testBroker := testbroker.NewTestBroker(r) + var ( + // mu guards called and e variables. + mu sync.Mutex + called int + e error + ) + checkFn := func(err error) { + mu.Lock() + defer mu.Unlock() + called++ + e = err + } + + hc := newHealthChecker(healthcheckerParams{ + logger: testLogger, + broker: testBroker, + interval: 1 * time.Second, + healthcheckFunc: checkFn, + }) + + testBroker.Sleep() + hc.start(&sync.WaitGroup{}) + + time.Sleep(2 * time.Second) + + mu.Lock() + if called == 0 { + t.Errorf("Healthchecker did not call the provided HealthCheckFunc") + } + if e == nil { + t.Errorf("HealthCheckFunc was called with nil; want non-nil error") + } + mu.Unlock() + + hc.terminate() +} diff --git a/internal/base/base.go b/internal/base/base.go index 7277051..4b216ab 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -263,6 +263,7 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) { // // See rdb.RDB as a reference implementation. type Broker interface { + Ping() error Enqueue(msg *TaskMessage) error EnqueueUnique(msg *TaskMessage, ttl time.Duration) error Dequeue(qnames ...string) (*TaskMessage, time.Time, error) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index e33f11e..69a90dd 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -45,6 +45,11 @@ func (r *RDB) Close() error { return r.client.Close() } +// Ping checks the connection with redis server. +func (r *RDB) Ping() error { + return r.client.Ping().Err() +} + // KEYS[1] -> asynq:queues: // KEYS[2] -> asynq:queues // ARGV[1] -> task message data diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 85d388b..e6bc725 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -180,6 +180,15 @@ func (tb *TestBroker) PublishCancelation(id string) error { return tb.real.PublishCancelation(id) } +func (tb *TestBroker) Ping() error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Ping() +} + func (tb *TestBroker) Close() error { tb.mu.Lock() defer tb.mu.Unlock() diff --git a/server.go b/server.go index 3fc8998..c86e88f 100644 --- a/server.go +++ b/server.go @@ -40,13 +40,14 @@ type Server struct { status *base.ServerStatus // wait group to wait for all goroutines to finish. - wg sync.WaitGroup - scheduler *scheduler - processor *processor - syncer *syncer - heartbeater *heartbeater - subscriber *subscriber - recoverer *recoverer + wg sync.WaitGroup + scheduler *scheduler + processor *processor + syncer *syncer + heartbeater *heartbeater + subscriber *subscriber + recoverer *recoverer + healthchecker *healthchecker } // Config specifies the server's background-task processing behavior. @@ -123,6 +124,15 @@ type Config struct { // // If unset or zero, default timeout of 8 seconds is used. ShutdownTimeout time.Duration + + // HealthCheckFunc is called periodically with any errors encountered during ping to the + // connected redis server. + HealthCheckFunc func(error) + + // HealthCheckInterval specifies the interval between healthchecks. + // + // If unset or zero, the interval is set to 15 seconds. + HealthCheckInterval time.Duration } // An ErrorHandler handles an error occured during task processing. @@ -250,7 +260,11 @@ var defaultQueueConfig = map[string]int{ base.DefaultQueueName: 1, } -const defaultShutdownTimeout = 8 * time.Second +const ( + defaultShutdownTimeout = 8 * time.Second + + defaultHealthCheckInterval = 15 * time.Second +) // NewServer returns a new Server given a redis connection option // and background processing configuration. @@ -276,6 +290,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if shutdownTimeout == 0 { shutdownTimeout = defaultShutdownTimeout } + healthcheckInterval := cfg.HealthCheckInterval + if healthcheckInterval == 0 { + healthcheckInterval = defaultHealthCheckInterval + } logger := log.NewLogger(cfg.Logger) loglevel := cfg.LogLevel if loglevel == level_unspecified { @@ -336,16 +354,23 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { retryDelayFunc: delayFunc, interval: 1 * time.Minute, }) + healthchecker := newHealthChecker(healthcheckerParams{ + logger: logger, + broker: rdb, + interval: healthcheckInterval, + healthcheckFunc: cfg.HealthCheckFunc, + }) return &Server{ - logger: logger, - broker: rdb, - status: status, - scheduler: scheduler, - processor: processor, - syncer: syncer, - heartbeater: heartbeater, - subscriber: subscriber, - recoverer: recoverer, + logger: logger, + broker: rdb, + status: status, + scheduler: scheduler, + processor: processor, + syncer: syncer, + heartbeater: heartbeater, + subscriber: subscriber, + recoverer: recoverer, + healthchecker: healthchecker, } } @@ -413,6 +438,7 @@ func (srv *Server) Start(handler Handler) error { srv.logger.Info("Starting processing") srv.heartbeater.start(&srv.wg) + srv.healthchecker.start(&srv.wg) srv.subscriber.start(&srv.wg) srv.syncer.start(&srv.wg) srv.recoverer.start(&srv.wg) @@ -442,6 +468,7 @@ func (srv *Server) Stop() { srv.recoverer.terminate() srv.syncer.terminate() srv.subscriber.terminate() + srv.healthchecker.terminate() srv.heartbeater.terminate() srv.wg.Wait()