2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Create server.go

This commit is contained in:
SaltySooda
2022-08-19 09:39:51 +08:00
committed by GitHub
parent c70ff6a335
commit 11988757a4

View File

@@ -220,6 +220,22 @@ type Config struct {
// //
// If unset or nil, the group aggregation feature will be disabled on the server. // If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregator GroupAggregator GroupAggregator GroupAggregator
// HeartbeatInterval specifies the interval between heartbeats.
//
// If unset or zero, the interval is set to 5 seconds.
HeartbeatInterval time.Duration
// RecoverInterval specifies the interval between recovers.
//
// If unset or zero, the interval is set to 60 seconds.
RecoverInterval time.Duration
// JanitorInterval specifies the interval between Janitors.
//
// If unset or zero, the interval is set to 8 seconds.
JanitorInterval time.Duration
} }
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
@@ -242,7 +258,7 @@ func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
return fn(group, tasks) return fn(group, tasks)
} }
// An ErrorHandler handles an error occurred during task processing. // An ErrorHandler handles an error occured during task processing.
type ErrorHandler interface { type ErrorHandler interface {
HandleError(ctx context.Context, task *Task, err error) HandleError(ctx context.Context, task *Task, err error)
} }
@@ -387,6 +403,13 @@ const (
defaultDelayedTaskCheckInterval = 5 * time.Second defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute defaultGroupGracePeriod = 1 * time.Minute
defaultHeartbeatInterval = 5 * time.Second
defaultRecoverInterval = 1 * time.Minute
defaultJanitorInterval = 8 * time.Second
) )
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
@@ -463,10 +486,14 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
requestsCh: syncCh, requestsCh: syncCh,
interval: 5 * time.Second, interval: 5 * time.Second,
}) })
heartbeatInterval := cfg.HeartbeatInterval
if heartbeatInterval == 0 {
heartbeatInterval = defaultHeartbeatInterval
}
heartbeater := newHeartbeater(heartbeaterParams{ heartbeater := newHeartbeater(heartbeaterParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
interval: 5 * time.Second, interval: heartbeatInterval,
concurrency: n, concurrency: n,
queues: queues, queues: queues,
strictPriority: cfg.StrictPriority, strictPriority: cfg.StrictPriority,
@@ -505,13 +532,17 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
starting: starting, starting: starting,
finished: finished, finished: finished,
}) })
recoverInterval := cfg.RecoverInterval
if recoverInterval == 0 {
recoverInterval = defaultRecoverInterval
}
recoverer := newRecoverer(recovererParams{ recoverer := newRecoverer(recovererParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
isFailureFunc: isFailureFunc, isFailureFunc: isFailureFunc,
queues: qnames, queues: qnames,
interval: 1 * time.Minute, interval: recoverInterval,
}) })
healthchecker := newHealthChecker(healthcheckerParams{ healthchecker := newHealthChecker(healthcheckerParams{
logger: logger, logger: logger,
@@ -519,11 +550,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
interval: healthcheckInterval, interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc, healthcheckFunc: cfg.HealthCheckFunc,
}) })
janitorInterval := cfg.JanitorInterval
if janitorInterval == 0 {
janitorInterval = defaultJanitorInterval
}
janitor := newJanitor(janitorParams{ janitor := newJanitor(janitorParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
queues: qnames, queues: qnames,
interval: 8 * time.Second, interval: janitorInterval,
}) })
aggregator := newAggregator(aggregatorParams{ aggregator := newAggregator(aggregatorParams{
logger: logger, logger: logger,
@@ -615,14 +650,31 @@ func (srv *Server) Start(handler Handler) error {
} }
srv.logger.Info("Starting processing") srv.logger.Info("Starting processing")
//周期向redis更新状态
srv.heartbeater.start(&srv.wg) srv.heartbeater.start(&srv.wg)
//周期ping redis
srv.healthchecker.start(&srv.wg) srv.healthchecker.start(&srv.wg)
//监听取消命令?
srv.subscriber.start(&srv.wg) srv.subscriber.start(&srv.wg)
//周期?清理过期请求?
srv.syncer.start(&srv.wg) srv.syncer.start(&srv.wg)
//active任务转换成retry
srv.recoverer.start(&srv.wg) srv.recoverer.start(&srv.wg)
//retry和schedule任务转换成pending任务
srv.forwarder.start(&srv.wg) srv.forwarder.start(&srv.wg)
//开启一个worker处理任务
srv.processor.start(&srv.wg) srv.processor.start(&srv.wg)
//删除过期任务
srv.janitor.start(&srv.wg) srv.janitor.start(&srv.wg)
//
srv.aggregator.start(&srv.wg) srv.aggregator.start(&srv.wg)
return nil return nil
} }