From ce46b076525b53232c1d1267e36accf8f237d0fd Mon Sep 17 00:00:00 2001 From: Mahdi Dibaiee Date: Mon, 3 Jan 2022 22:44:00 +0000 Subject: [PATCH] Allow configuration of DelayedTaskCheckInterval --- .gitignore | 1 + CHANGELOG.md | 1 + server.go | 14 +++++++++++++- 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 7fca36c..98b2487 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +vendor # Binaries for programs and plugins *.exe *.exe~ diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c67590..0187efa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - The `asynq stats` command now supports a `--json` option, making its output a JSON object +- Introduced new configuration for `DelayedTaskCheckInterval`. See [godoc](https://godoc.org/github.com/hibiken/asynq) for more details. ## [0.20.0] - 2021-12-19 diff --git a/server.go b/server.go index 78a9955..cdc14c7 100644 --- a/server.go +++ b/server.go @@ -144,6 +144,12 @@ type Config struct { // // If unset or zero, the interval is set to 15 seconds. HealthCheckInterval time.Duration + + // DelayedTaskCheckInterval specifies the interval between checks run on 'scheduled' and 'retry' + // tasks, and forwarding them to 'pending' state if they are ready to be processed. + // + // If unset or zero, the interval is set to 5 seconds. + DelayedTaskCheckInterval time.Duration } // An ErrorHandler handles an error occured during task processing. @@ -287,6 +293,8 @@ const ( defaultShutdownTimeout = 8 * time.Second defaultHealthCheckInterval = 15 * time.Second + + defaultDelayedTaskCheckInterval = 5 * time.Second ) // NewServer returns a new Server given a redis connection option @@ -362,11 +370,15 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { starting: starting, finished: finished, }) + delayedTaskCheckInterval := cfg.DelayedTaskCheckInterval + if delayedTaskCheckInterval == 0 { + delayedTaskCheckInterval = defaultDelayedTaskCheckInterval + } forwarder := newForwarder(forwarderParams{ logger: logger, broker: rdb, queues: qnames, - interval: 5 * time.Second, + interval: delayedTaskCheckInterval, }) subscriber := newSubscriber(subscriberParams{ logger: logger,