diff --git a/aggregator.go b/aggregator.go index 2ad30ed..40a07db 100644 --- a/aggregator.go +++ b/aggregator.go @@ -82,12 +82,18 @@ func newAggregator(params aggregatorParams) *aggregator { } func (a *aggregator) shutdown() { + if a.aggregateFunc == nil { + return + } a.logger.Debug("Aggregator shutting down...") // Signal the aggregator goroutine to stop. a.done <- struct{}{} } func (a *aggregator) start(wg *sync.WaitGroup) { + if a.aggregateFunc == nil { + return + } wg.Add(1) go func() { defer wg.Done() diff --git a/server.go b/server.go index 491c53b..3b700a1 100644 --- a/server.go +++ b/server.go @@ -50,6 +50,7 @@ type Server struct { recoverer *recoverer healthchecker *healthchecker janitor *janitor + aggregator *aggregator } type serverState struct { @@ -198,25 +199,26 @@ type Config struct { // the tasks in a group. If an incoming task is received within this period, the server will wait for another // period of the same length, up to GroupMaxDelay. // - // TODO: Document the default value used if not specified + // If unset or zero, the grace period is set to 1 minute. + // Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to + // NewServer will panic. GroupGracePeriod time.Duration // GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating // the tasks in a group. // - // TODO: Document the default value used if not specified + // If unset or zero, the max delay is set to 10 minutes. GroupMaxDelay time.Duration // GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group. // If GroupMaxSize is reached, the server will aggregate the tasks into one immediately. // - // TODO: Document the default behavior if not specified. + // If unset or zero, the max size is set to 100. GroupMaxSize int - // GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into - // one task which will be passed to the Handler. + // GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task. // - // TODO: Docuent the default behavior if not specified. + // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregateFunc func(groupKey string, tasks []*Task) *Task } @@ -363,6 +365,12 @@ const ( defaultHealthCheckInterval = 15 * time.Second defaultDelayedTaskCheckInterval = 5 * time.Second + + defaultGroupGracePeriod = 1 * time.Minute + + defaultGroupMaxDelay = 10 * time.Minute + + defaultGroupMaxSize = 100 ) // NewServer returns a new Server given a redis connection option @@ -412,6 +420,22 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if healthcheckInterval == 0 { healthcheckInterval = defaultHealthCheckInterval } + // TODO: Create a helper to check for zero value and fall back to default (e.g. getDurationOrDefault()) + groupGracePeriod := cfg.GroupGracePeriod + if groupGracePeriod == 0 { + groupGracePeriod = defaultGroupGracePeriod + } + if groupGracePeriod < time.Second { + panic("GroupGracePeriod cannot be less than a second") + } + groupMaxDelay := cfg.GroupMaxDelay + if groupMaxDelay == 0 { + groupMaxDelay = defaultGroupMaxDelay + } + groupMaxSize := cfg.GroupMaxSize + if groupMaxSize == 0 { + groupMaxSize = defaultGroupMaxSize + } logger := log.NewLogger(cfg.Logger) loglevel := cfg.LogLevel if loglevel == level_unspecified { @@ -493,6 +517,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { queues: qnames, interval: 8 * time.Second, }) + aggregator := newAggregator(aggregatorParams{ + logger: logger, + broker: rdb, + queues: qnames, + gracePeriod: groupGracePeriod, + maxDelay: groupMaxDelay, + maxSize: groupMaxSize, + aggregateFunc: cfg.GroupAggregateFunc, + }) return &Server{ logger: logger, broker: rdb, @@ -505,6 +538,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { recoverer: recoverer, healthchecker: healthchecker, janitor: janitor, + aggregator: aggregator, } } @@ -581,6 +615,7 @@ func (srv *Server) Start(handler Handler) error { srv.forwarder.start(&srv.wg) srv.processor.start(&srv.wg) srv.janitor.start(&srv.wg) + srv.aggregator.start(&srv.wg) return nil } @@ -626,6 +661,7 @@ func (srv *Server) Shutdown() { srv.syncer.shutdown() srv.subscriber.shutdown() srv.janitor.shutdown() + srv.aggregator.shutdown() srv.healthchecker.shutdown() srv.heartbeater.shutdown() srv.wg.Wait()