2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Run aggregator on the server

This commit is contained in:
Ken Hibino 2022-03-10 12:15:28 -08:00
parent 4b35eb0e1a
commit 196db64d4d
2 changed files with 48 additions and 6 deletions

View File

@ -82,12 +82,18 @@ func newAggregator(params aggregatorParams) *aggregator {
}
func (a *aggregator) shutdown() {
if a.aggregateFunc == nil {
return
}
a.logger.Debug("Aggregator shutting down...")
// Signal the aggregator goroutine to stop.
a.done <- struct{}{}
}
func (a *aggregator) start(wg *sync.WaitGroup) {
if a.aggregateFunc == nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()

View File

@ -50,6 +50,7 @@ type Server struct {
recoverer *recoverer
healthchecker *healthchecker
janitor *janitor
aggregator *aggregator
}
type serverState struct {
@ -198,25 +199,26 @@ type Config struct {
// the tasks in a group. If an incoming task is received within this period, the server will wait for another
// period of the same length, up to GroupMaxDelay.
//
// TODO: Document the default value used if not specified
// If unset or zero, the grace period is set to 1 minute.
// Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to
// NewServer will panic.
GroupGracePeriod time.Duration
// GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating
// the tasks in a group.
//
// TODO: Document the default value used if not specified
// If unset or zero, the max delay is set to 10 minutes.
GroupMaxDelay time.Duration
// GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group.
// If GroupMaxSize is reached, the server will aggregate the tasks into one immediately.
//
// TODO: Document the default behavior if not specified.
// If unset or zero, the max size is set to 100.
GroupMaxSize int
// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into
// one task which will be passed to the Handler.
// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task.
//
// TODO: Docuent the default behavior if not specified.
// If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregateFunc func(groupKey string, tasks []*Task) *Task
}
@ -363,6 +365,12 @@ const (
defaultHealthCheckInterval = 15 * time.Second
defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute
defaultGroupMaxDelay = 10 * time.Minute
defaultGroupMaxSize = 100
)
// NewServer returns a new Server given a redis connection option
@ -412,6 +420,22 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if healthcheckInterval == 0 {
healthcheckInterval = defaultHealthCheckInterval
}
// TODO: Create a helper to check for zero value and fall back to default (e.g. getDurationOrDefault())
groupGracePeriod := cfg.GroupGracePeriod
if groupGracePeriod == 0 {
groupGracePeriod = defaultGroupGracePeriod
}
if groupGracePeriod < time.Second {
panic("GroupGracePeriod cannot be less than a second")
}
groupMaxDelay := cfg.GroupMaxDelay
if groupMaxDelay == 0 {
groupMaxDelay = defaultGroupMaxDelay
}
groupMaxSize := cfg.GroupMaxSize
if groupMaxSize == 0 {
groupMaxSize = defaultGroupMaxSize
}
logger := log.NewLogger(cfg.Logger)
loglevel := cfg.LogLevel
if loglevel == level_unspecified {
@ -493,6 +517,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
queues: qnames,
interval: 8 * time.Second,
})
aggregator := newAggregator(aggregatorParams{
logger: logger,
broker: rdb,
queues: qnames,
gracePeriod: groupGracePeriod,
maxDelay: groupMaxDelay,
maxSize: groupMaxSize,
aggregateFunc: cfg.GroupAggregateFunc,
})
return &Server{
logger: logger,
broker: rdb,
@ -505,6 +538,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
recoverer: recoverer,
healthchecker: healthchecker,
janitor: janitor,
aggregator: aggregator,
}
}
@ -581,6 +615,7 @@ func (srv *Server) Start(handler Handler) error {
srv.forwarder.start(&srv.wg)
srv.processor.start(&srv.wg)
srv.janitor.start(&srv.wg)
srv.aggregator.start(&srv.wg)
return nil
}
@ -626,6 +661,7 @@ func (srv *Server) Shutdown() {
srv.syncer.shutdown()
srv.subscriber.shutdown()
srv.janitor.shutdown()
srv.aggregator.shutdown()
srv.healthchecker.shutdown()
srv.heartbeater.shutdown()
srv.wg.Wait()