Add recoverer

This commit is contained in:
Ken Hibino
2020-06-21 07:05:57 -07:00
parent 7657f560ec
commit feee87adda
7 changed files with 292 additions and 149 deletions

View File

@@ -46,6 +46,7 @@ type Server struct {
syncer *syncer
heartbeater *heartbeater
subscriber *subscriber
recoverer *recoverer
}
// Config specifies the server's background-task processing behavior.
@@ -329,6 +330,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
starting: starting,
finished: finished,
})
recoverer := newRecoverer(recovererParams{
logger: logger,
broker: rdb,
retryDelayFunc: delayFunc,
interval: 1 * time.Minute,
})
return &Server{
logger: logger,
broker: rdb,
@@ -338,6 +345,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
syncer: syncer,
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
}
}
@@ -407,6 +415,7 @@ func (srv *Server) Start(handler Handler) error {
srv.heartbeater.start(&srv.wg)
srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg)
srv.recoverer.start(&srv.wg)
srv.scheduler.start(&srv.wg)
srv.processor.start(&srv.wg)
return nil
@@ -430,6 +439,7 @@ func (srv *Server) Stop() {
// processor -> heartbeater (via starting, finished channels)
srv.scheduler.terminate()
srv.processor.terminate()
srv.recoverer.terminate()
srv.syncer.terminate()
srv.subscriber.terminate()
srv.heartbeater.terminate()