mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 23:06:12 +08:00 
			
		
		
		
	Run aggregator on the server
This commit is contained in:
		| @@ -82,12 +82,18 @@ func newAggregator(params aggregatorParams) *aggregator { | ||||
| } | ||||
|  | ||||
| func (a *aggregator) shutdown() { | ||||
| 	if a.aggregateFunc == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	a.logger.Debug("Aggregator shutting down...") | ||||
| 	// Signal the aggregator goroutine to stop. | ||||
| 	a.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (a *aggregator) start(wg *sync.WaitGroup) { | ||||
| 	if a.aggregateFunc == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	wg.Add(1) | ||||
| 	go func() { | ||||
| 		defer wg.Done() | ||||
|   | ||||
							
								
								
									
										48
									
								
								server.go
									
									
									
									
									
								
							
							
						
						
									
										48
									
								
								server.go
									
									
									
									
									
								
							| @@ -50,6 +50,7 @@ type Server struct { | ||||
| 	recoverer     *recoverer | ||||
| 	healthchecker *healthchecker | ||||
| 	janitor       *janitor | ||||
| 	aggregator    *aggregator | ||||
| } | ||||
|  | ||||
| type serverState struct { | ||||
| @@ -198,25 +199,26 @@ type Config struct { | ||||
| 	// the tasks in a group. If an incoming task is received within this period, the server will wait for another | ||||
| 	// period of the same length, up to GroupMaxDelay. | ||||
| 	// | ||||
| 	// TODO: Document the default value used if not specified | ||||
| 	// If unset or zero, the grace period is set to 1 minute. | ||||
| 	// Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to | ||||
| 	// NewServer will panic. | ||||
| 	GroupGracePeriod time.Duration | ||||
|  | ||||
| 	// GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating | ||||
| 	// the tasks in a group. | ||||
| 	// | ||||
| 	// TODO: Document the default value used if not specified | ||||
| 	// If unset or zero, the max delay is set to 10 minutes. | ||||
| 	GroupMaxDelay time.Duration | ||||
|  | ||||
| 	// GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group. | ||||
| 	// If GroupMaxSize is reached, the server will aggregate the tasks into one immediately. | ||||
| 	// | ||||
| 	// TODO: Document the default behavior if not specified. | ||||
| 	// If unset or zero, the max size is set to 100. | ||||
| 	GroupMaxSize int | ||||
|  | ||||
| 	// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into | ||||
| 	// one task which will be passed to the Handler. | ||||
| 	// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task. | ||||
| 	// | ||||
| 	// TODO: Docuent the default behavior if not specified. | ||||
| 	// If unset or nil, the group aggregation feature will be disabled on the server. | ||||
| 	GroupAggregateFunc func(groupKey string, tasks []*Task) *Task | ||||
| } | ||||
|  | ||||
| @@ -363,6 +365,12 @@ const ( | ||||
| 	defaultHealthCheckInterval = 15 * time.Second | ||||
|  | ||||
| 	defaultDelayedTaskCheckInterval = 5 * time.Second | ||||
|  | ||||
| 	defaultGroupGracePeriod = 1 * time.Minute | ||||
|  | ||||
| 	defaultGroupMaxDelay = 10 * time.Minute | ||||
|  | ||||
| 	defaultGroupMaxSize = 100 | ||||
| ) | ||||
|  | ||||
| // NewServer returns a new Server given a redis connection option | ||||
| @@ -412,6 +420,22 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { | ||||
| 	if healthcheckInterval == 0 { | ||||
| 		healthcheckInterval = defaultHealthCheckInterval | ||||
| 	} | ||||
| 	// TODO: Create a helper to check for zero value and fall back to default (e.g. getDurationOrDefault()) | ||||
| 	groupGracePeriod := cfg.GroupGracePeriod | ||||
| 	if groupGracePeriod == 0 { | ||||
| 		groupGracePeriod = defaultGroupGracePeriod | ||||
| 	} | ||||
| 	if groupGracePeriod < time.Second { | ||||
| 		panic("GroupGracePeriod cannot be less than a second") | ||||
| 	} | ||||
| 	groupMaxDelay := cfg.GroupMaxDelay | ||||
| 	if groupMaxDelay == 0 { | ||||
| 		groupMaxDelay = defaultGroupMaxDelay | ||||
| 	} | ||||
| 	groupMaxSize := cfg.GroupMaxSize | ||||
| 	if groupMaxSize == 0 { | ||||
| 		groupMaxSize = defaultGroupMaxSize | ||||
| 	} | ||||
| 	logger := log.NewLogger(cfg.Logger) | ||||
| 	loglevel := cfg.LogLevel | ||||
| 	if loglevel == level_unspecified { | ||||
| @@ -493,6 +517,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { | ||||
| 		queues:   qnames, | ||||
| 		interval: 8 * time.Second, | ||||
| 	}) | ||||
| 	aggregator := newAggregator(aggregatorParams{ | ||||
| 		logger:        logger, | ||||
| 		broker:        rdb, | ||||
| 		queues:        qnames, | ||||
| 		gracePeriod:   groupGracePeriod, | ||||
| 		maxDelay:      groupMaxDelay, | ||||
| 		maxSize:       groupMaxSize, | ||||
| 		aggregateFunc: cfg.GroupAggregateFunc, | ||||
| 	}) | ||||
| 	return &Server{ | ||||
| 		logger:        logger, | ||||
| 		broker:        rdb, | ||||
| @@ -505,6 +538,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { | ||||
| 		recoverer:     recoverer, | ||||
| 		healthchecker: healthchecker, | ||||
| 		janitor:       janitor, | ||||
| 		aggregator:    aggregator, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -581,6 +615,7 @@ func (srv *Server) Start(handler Handler) error { | ||||
| 	srv.forwarder.start(&srv.wg) | ||||
| 	srv.processor.start(&srv.wg) | ||||
| 	srv.janitor.start(&srv.wg) | ||||
| 	srv.aggregator.start(&srv.wg) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -626,6 +661,7 @@ func (srv *Server) Shutdown() { | ||||
| 	srv.syncer.shutdown() | ||||
| 	srv.subscriber.shutdown() | ||||
| 	srv.janitor.shutdown() | ||||
| 	srv.aggregator.shutdown() | ||||
| 	srv.healthchecker.shutdown() | ||||
| 	srv.heartbeater.shutdown() | ||||
| 	srv.wg.Wait() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user