diff --git a/processor.go b/processor.go index b177f33..0ba9890 100644 --- a/processor.go +++ b/processor.go @@ -37,8 +37,9 @@ type processor struct { // orderedQueues is set only in strict-priority mode. orderedQueues []string - retryDelayFunc RetryDelayFunc - isFailureFunc func(error) bool + taskCheckInterval time.Duration + retryDelayFunc RetryDelayFunc + isFailureFunc func(error) bool errHandler ErrorHandler shutdownTimeout time.Duration @@ -72,20 +73,21 @@ type processor struct { } type processorParams struct { - logger *log.Logger - broker base.Broker - baseCtxFn func() context.Context - retryDelayFunc RetryDelayFunc - isFailureFunc func(error) bool - syncCh chan<- *syncRequest - cancelations *base.Cancelations - concurrency int - queues map[string]int - strictPriority bool - errHandler ErrorHandler - shutdownTimeout time.Duration - starting chan<- *workerInfo - finished chan<- *base.TaskMessage + logger *log.Logger + broker base.Broker + baseCtxFn func() context.Context + retryDelayFunc RetryDelayFunc + taskCheckInterval time.Duration + isFailureFunc func(error) bool + syncCh chan<- *syncRequest + cancelations *base.Cancelations + concurrency int + queues map[string]int + strictPriority bool + errHandler ErrorHandler + shutdownTimeout time.Duration + starting chan<- *workerInfo + finished chan<- *base.TaskMessage } // newProcessor constructs a new processor. @@ -96,26 +98,27 @@ func newProcessor(params processorParams) *processor { orderedQueues = sortByPriority(queues) } return &processor{ - logger: params.logger, - broker: params.broker, - baseCtxFn: params.baseCtxFn, - clock: timeutil.NewRealClock(), - queueConfig: queues, - orderedQueues: orderedQueues, - retryDelayFunc: params.retryDelayFunc, - isFailureFunc: params.isFailureFunc, - syncRequestCh: params.syncCh, - cancelations: params.cancelations, - errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), - sema: make(chan struct{}, params.concurrency), - done: make(chan struct{}), - quit: make(chan struct{}), - abort: make(chan struct{}), - errHandler: params.errHandler, - handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), - shutdownTimeout: params.shutdownTimeout, - starting: params.starting, - finished: params.finished, + logger: params.logger, + broker: params.broker, + baseCtxFn: params.baseCtxFn, + clock: timeutil.NewRealClock(), + queueConfig: queues, + orderedQueues: orderedQueues, + taskCheckInterval: params.taskCheckInterval, + retryDelayFunc: params.retryDelayFunc, + isFailureFunc: params.isFailureFunc, + syncRequestCh: params.syncCh, + cancelations: params.cancelations, + errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), + sema: make(chan struct{}, params.concurrency), + done: make(chan struct{}), + quit: make(chan struct{}), + abort: make(chan struct{}), + errHandler: params.errHandler, + handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), + shutdownTimeout: params.shutdownTimeout, + starting: params.starting, + finished: params.finished, } } @@ -178,7 +181,7 @@ func (p *processor) exec() { // Sleep to avoid slamming redis and let scheduler move tasks into queues. // Note: We are not using blocking pop operation and polling queues instead. // This adds significant load to redis. - time.Sleep(time.Second) + time.Sleep(p.taskCheckInterval) <-p.sema // release token return case err != nil: diff --git a/processor_test.go b/processor_test.go index 4112181..9be4729 100644 --- a/processor_test.go +++ b/processor_test.go @@ -63,20 +63,21 @@ func newProcessorForTest(t *testing.T, r *rdb.RDB, h Handler) *processor { go fakeHeartbeater(starting, finished, done) go fakeSyncer(syncCh, done) p := newProcessor(processorParams{ - logger: testLogger, - broker: r, - baseCtxFn: context.Background, - retryDelayFunc: DefaultRetryDelayFunc, - isFailureFunc: defaultIsFailureFunc, - syncCh: syncCh, - cancelations: base.NewCancelations(), - concurrency: 10, - queues: defaultQueueConfig, - strictPriority: false, - errHandler: nil, - shutdownTimeout: defaultShutdownTimeout, - starting: starting, - finished: finished, + logger: testLogger, + broker: r, + baseCtxFn: context.Background, + retryDelayFunc: DefaultRetryDelayFunc, + taskCheckInterval: defaultTaskCheckInterval, + isFailureFunc: defaultIsFailureFunc, + syncCh: syncCh, + cancelations: base.NewCancelations(), + concurrency: 10, + queues: defaultQueueConfig, + strictPriority: false, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + starting: starting, + finished: finished, }) p.handler = h return p @@ -539,20 +540,21 @@ func TestProcessorWithExpiredLease(t *testing.T) { }() go fakeSyncer(syncCh, done) p := newProcessor(processorParams{ - logger: testLogger, - broker: rdbClient, - baseCtxFn: context.Background, - retryDelayFunc: DefaultRetryDelayFunc, - isFailureFunc: defaultIsFailureFunc, - syncCh: syncCh, - cancelations: base.NewCancelations(), - concurrency: 10, - queues: defaultQueueConfig, - strictPriority: false, - errHandler: nil, - shutdownTimeout: defaultShutdownTimeout, - starting: starting, - finished: finished, + logger: testLogger, + broker: rdbClient, + baseCtxFn: context.Background, + taskCheckInterval: defaultTaskCheckInterval, + retryDelayFunc: DefaultRetryDelayFunc, + isFailureFunc: defaultIsFailureFunc, + syncCh: syncCh, + cancelations: base.NewCancelations(), + concurrency: 10, + queues: defaultQueueConfig, + strictPriority: false, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + starting: starting, + finished: finished, }) p.handler = tc.handler var ( @@ -693,20 +695,21 @@ func TestProcessorWithStrictPriority(t *testing.T) { go fakeHeartbeater(starting, finished, done) go fakeSyncer(syncCh, done) p := newProcessor(processorParams{ - logger: testLogger, - broker: rdbClient, - baseCtxFn: context.Background, - retryDelayFunc: DefaultRetryDelayFunc, - isFailureFunc: defaultIsFailureFunc, - syncCh: syncCh, - cancelations: base.NewCancelations(), - concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. - queues: queueCfg, - strictPriority: true, - errHandler: nil, - shutdownTimeout: defaultShutdownTimeout, - starting: starting, - finished: finished, + logger: testLogger, + broker: rdbClient, + baseCtxFn: context.Background, + taskCheckInterval: defaultTaskCheckInterval, + retryDelayFunc: DefaultRetryDelayFunc, + isFailureFunc: defaultIsFailureFunc, + syncCh: syncCh, + cancelations: base.NewCancelations(), + concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. + queues: queueCfg, + strictPriority: true, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + starting: starting, + finished: finished, }) p.handler = HandlerFunc(handler) diff --git a/server.go b/server.go index e31ce2f..b029421 100644 --- a/server.go +++ b/server.go @@ -15,10 +15,10 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // Server is responsible for task processing and task lifecycle management. @@ -104,6 +104,15 @@ type Config struct { // If this is defined, then it MUST return a non-nil context BaseContext func() context.Context + // TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty. + // + // Be careful not to set this value too low because it adds significant load to redis. + // + // If set to a zero or negative value, NewServer will overwrite the value with default value. + // + // By default, TaskCheckInterval is set to 1 seconds. + TaskCheckInterval time.Duration + // Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. @@ -390,6 +399,8 @@ var defaultQueueConfig = map[string]int{ } const ( + defaultTaskCheckInterval = 1 * time.Second + defaultShutdownTimeout = 8 * time.Second defaultHealthCheckInterval = 15 * time.Second @@ -414,6 +425,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if n < 1 { n = runtime.NumCPU() } + + taskCheckInterval := cfg.TaskCheckInterval + if taskCheckInterval <= 0 { + taskCheckInterval = defaultTaskCheckInterval + } + delayFunc := cfg.RetryDelayFunc if delayFunc == nil { delayFunc = DefaultRetryDelayFunc @@ -500,20 +517,21 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { cancelations: cancels, }) processor := newProcessor(processorParams{ - logger: logger, - broker: rdb, - retryDelayFunc: delayFunc, - baseCtxFn: baseCtxFn, - isFailureFunc: isFailureFunc, - syncCh: syncCh, - cancelations: cancels, - concurrency: n, - queues: queues, - strictPriority: cfg.StrictPriority, - errHandler: cfg.ErrorHandler, - shutdownTimeout: shutdownTimeout, - starting: starting, - finished: finished, + logger: logger, + broker: rdb, + retryDelayFunc: delayFunc, + taskCheckInterval: taskCheckInterval, + baseCtxFn: baseCtxFn, + isFailureFunc: isFailureFunc, + syncCh: syncCh, + cancelations: cancels, + concurrency: n, + queues: queues, + strictPriority: cfg.StrictPriority, + errHandler: cfg.ErrorHandler, + shutdownTimeout: shutdownTimeout, + starting: starting, + finished: finished, }) recoverer := newRecoverer(recovererParams{ logger: logger,