mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Allow configuration of DelayedTaskCheckInterval
This commit is contained in:
		
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1,3 +1,4 @@ | |||||||
|  | vendor | ||||||
| # Binaries for programs and plugins | # Binaries for programs and plugins | ||||||
| *.exe | *.exe | ||||||
| *.exe~ | *.exe~ | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 | |||||||
| ### Added | ### Added | ||||||
|  |  | ||||||
| - The `asynq stats` command now supports a `--json` option, making its output a JSON object | - The `asynq stats` command now supports a `--json` option, making its output a JSON object | ||||||
|  | - Introduced new configuration for `DelayedTaskCheckInterval`. See [godoc](https://godoc.org/github.com/hibiken/asynq) for more details. | ||||||
|  |  | ||||||
| ## [0.20.0] - 2021-12-19 | ## [0.20.0] - 2021-12-19 | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										14
									
								
								server.go
									
									
									
									
									
								
							
							
						
						
									
										14
									
								
								server.go
									
									
									
									
									
								
							| @@ -144,6 +144,12 @@ type Config struct { | |||||||
| 	// | 	// | ||||||
| 	// If unset or zero, the interval is set to 15 seconds. | 	// If unset or zero, the interval is set to 15 seconds. | ||||||
| 	HealthCheckInterval time.Duration | 	HealthCheckInterval time.Duration | ||||||
|  |  | ||||||
|  | 	// DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry' | ||||||
|  | 	// tasks, and forwarding them to 'pending' state if they are ready to be processed. | ||||||
|  | 	// | ||||||
|  | 	// If unset or zero, the interval is set to 5 seconds. | ||||||
|  | 	DelayedTaskCheckInterval time.Duration | ||||||
| } | } | ||||||
|  |  | ||||||
| // An ErrorHandler handles an error occured during task processing. | // An ErrorHandler handles an error occured during task processing. | ||||||
| @@ -287,6 +293,8 @@ const ( | |||||||
| 	defaultShutdownTimeout = 8 * time.Second | 	defaultShutdownTimeout = 8 * time.Second | ||||||
|  |  | ||||||
| 	defaultHealthCheckInterval = 15 * time.Second | 	defaultHealthCheckInterval = 15 * time.Second | ||||||
|  |  | ||||||
|  | 	defaultDelayedTaskCheckInterval = 5 * time.Second | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // NewServer returns a new Server given a redis connection option | // NewServer returns a new Server given a redis connection option | ||||||
| @@ -362,11 +370,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { | |||||||
| 		starting:       starting, | 		starting:       starting, | ||||||
| 		finished:       finished, | 		finished:       finished, | ||||||
| 	}) | 	}) | ||||||
|  | 	delayedTaskCheckInterval := cfg.DelayedTaskCheckInterval | ||||||
|  | 	if delayedTaskCheckInterval == 0 { | ||||||
|  | 		delayedTaskCheckInterval = defaultDelayedTaskCheckInterval | ||||||
|  | 	} | ||||||
| 	forwarder := newForwarder(forwarderParams{ | 	forwarder := newForwarder(forwarderParams{ | ||||||
| 		logger:   logger, | 		logger:   logger, | ||||||
| 		broker:   rdb, | 		broker:   rdb, | ||||||
| 		queues:   qnames, | 		queues:   qnames, | ||||||
| 		interval: 5 * time.Second, | 		interval: delayedTaskCheckInterval, | ||||||
| 	}) | 	}) | ||||||
| 	subscriber := newSubscriber(subscriberParams{ | 	subscriber := newSubscriber(subscriberParams{ | ||||||
| 		logger:       logger, | 		logger:       logger, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user