diff --git a/server.go b/server.go index 4bf04e0..343b77e 100644 --- a/server.go +++ b/server.go @@ -220,6 +220,22 @@ type Config struct { // // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregator GroupAggregator + + // HeartbeatInterval specifies the interval between heartbeats. + // + // If unset or zero, the interval is set to 5 seconds. + HeartbeatInterval time.Duration + + // RecoverInterval specifies the interval between recovers. + // + // If unset or zero, the interval is set to 60 seconds. + RecoverInterval time.Duration + + // JanitorInterval specifies the interval between Janitors. + // + // If unset or zero, the interval is set to 8 seconds. + JanitorInterval time.Duration + } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -242,7 +258,7 @@ func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task { return fn(group, tasks) } -// An ErrorHandler handles an error occurred during task processing. +// An ErrorHandler handles an error occured during task processing. type ErrorHandler interface { HandleError(ctx context.Context, task *Task, err error) } @@ -387,6 +403,13 @@ const ( defaultDelayedTaskCheckInterval = 5 * time.Second defaultGroupGracePeriod = 1 * time.Minute + + defaultHeartbeatInterval = 5 * time.Second + + defaultRecoverInterval = 1 * time.Minute + + defaultJanitorInterval = 8 * time.Second + ) // NewServer returns a new Server given a redis connection option @@ -463,10 +486,14 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { requestsCh: syncCh, interval: 5 * time.Second, }) + heartbeatInterval := cfg.HeartbeatInterval + if heartbeatInterval == 0 { + heartbeatInterval = defaultHeartbeatInterval + } heartbeater := newHeartbeater(heartbeaterParams{ logger: logger, broker: rdb, - interval: 5 * time.Second, + interval: heartbeatInterval, concurrency: n, queues: queues, strictPriority: cfg.StrictPriority, @@ -505,13 +532,17 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { starting: starting, finished: finished, }) + recoverInterval := cfg.RecoverInterval + if recoverInterval == 0 { + recoverInterval = defaultRecoverInterval + } recoverer := newRecoverer(recovererParams{ logger: logger, broker: rdb, retryDelayFunc: delayFunc, isFailureFunc: isFailureFunc, queues: qnames, - interval: 1 * time.Minute, + interval: recoverInterval, }) healthchecker := newHealthChecker(healthcheckerParams{ logger: logger, @@ -519,11 +550,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { interval: healthcheckInterval, healthcheckFunc: cfg.HealthCheckFunc, }) + janitorInterval := cfg.JanitorInterval + if janitorInterval == 0 { + janitorInterval = defaultJanitorInterval + } janitor := newJanitor(janitorParams{ logger: logger, broker: rdb, queues: qnames, - interval: 8 * time.Second, + interval: janitorInterval, }) aggregator := newAggregator(aggregatorParams{ logger: logger, @@ -615,14 +650,31 @@ func (srv *Server) Start(handler Handler) error { } srv.logger.Info("Starting processing") + //周期向redis更新状态 srv.heartbeater.start(&srv.wg) + + //周期ping redis srv.healthchecker.start(&srv.wg) + + //监听取消命令? srv.subscriber.start(&srv.wg) + + //周期?清理过期请求? srv.syncer.start(&srv.wg) + + //active任务转换成retry srv.recoverer.start(&srv.wg) + + //retry和schedule任务转换成pending任务 srv.forwarder.start(&srv.wg) + + //开启一个worker处理任务 srv.processor.start(&srv.wg) + + //删除过期任务 srv.janitor.start(&srv.wg) + + //? srv.aggregator.start(&srv.wg) return nil }