2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Add option to configure task check interval

This commit is contained in:
Hubert Krauze 2023-06-02 01:58:54 +02:00 committed by Ken Hibino
parent 1e0bf88bf3
commit 16ec43cbca
3 changed files with 118 additions and 94 deletions

View File

@ -37,8 +37,9 @@ type processor struct {
// orderedQueues is set only in strict-priority mode. // orderedQueues is set only in strict-priority mode.
orderedQueues []string orderedQueues []string
retryDelayFunc RetryDelayFunc taskCheckInterval time.Duration
isFailureFunc func(error) bool retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool
errHandler ErrorHandler errHandler ErrorHandler
shutdownTimeout time.Duration shutdownTimeout time.Duration
@ -72,20 +73,21 @@ type processor struct {
} }
type processorParams struct { type processorParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
baseCtxFn func() context.Context baseCtxFn func() context.Context
retryDelayFunc RetryDelayFunc retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool taskCheckInterval time.Duration
syncCh chan<- *syncRequest isFailureFunc func(error) bool
cancelations *base.Cancelations syncCh chan<- *syncRequest
concurrency int cancelations *base.Cancelations
queues map[string]int concurrency int
strictPriority bool queues map[string]int
errHandler ErrorHandler strictPriority bool
shutdownTimeout time.Duration errHandler ErrorHandler
starting chan<- *workerInfo shutdownTimeout time.Duration
finished chan<- *base.TaskMessage starting chan<- *workerInfo
finished chan<- *base.TaskMessage
} }
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
@ -96,26 +98,27 @@ func newProcessor(params processorParams) *processor {
orderedQueues = sortByPriority(queues) orderedQueues = sortByPriority(queues)
} }
return &processor{ return &processor{
logger: params.logger, logger: params.logger,
broker: params.broker, broker: params.broker,
baseCtxFn: params.baseCtxFn, baseCtxFn: params.baseCtxFn,
clock: timeutil.NewRealClock(), clock: timeutil.NewRealClock(),
queueConfig: queues, queueConfig: queues,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc, taskCheckInterval: params.taskCheckInterval,
isFailureFunc: params.isFailureFunc, retryDelayFunc: params.retryDelayFunc,
syncRequestCh: params.syncCh, isFailureFunc: params.isFailureFunc,
cancelations: params.cancelations, syncRequestCh: params.syncCh,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), cancelations: params.cancelations,
sema: make(chan struct{}, params.concurrency), errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
done: make(chan struct{}), sema: make(chan struct{}, params.concurrency),
quit: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), quit: make(chan struct{}),
errHandler: params.errHandler, abort: make(chan struct{}),
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), errHandler: params.errHandler,
shutdownTimeout: params.shutdownTimeout, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
starting: params.starting, shutdownTimeout: params.shutdownTimeout,
finished: params.finished, starting: params.starting,
finished: params.finished,
} }
} }
@ -178,7 +181,7 @@ func (p *processor) exec() {
// Sleep to avoid slamming redis and let scheduler move tasks into queues. // Sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: We are not using blocking pop operation and polling queues instead. // Note: We are not using blocking pop operation and polling queues instead.
// This adds significant load to redis. // This adds significant load to redis.
time.Sleep(time.Second) time.Sleep(p.taskCheckInterval)
<-p.sema // release token <-p.sema // release token
return return
case err != nil: case err != nil:

View File

@ -63,20 +63,21 @@ func newProcessorForTest(t *testing.T, r *rdb.RDB, h Handler) *processor {
go fakeHeartbeater(starting, finished, done) go fakeHeartbeater(starting, finished, done)
go fakeSyncer(syncCh, done) go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: r, broker: r,
baseCtxFn: context.Background, baseCtxFn: context.Background,
retryDelayFunc: DefaultRetryDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
isFailureFunc: defaultIsFailureFunc, taskCheckInterval: defaultTaskCheckInterval,
syncCh: syncCh, isFailureFunc: defaultIsFailureFunc,
cancelations: base.NewCancelations(), syncCh: syncCh,
concurrency: 10, cancelations: base.NewCancelations(),
queues: defaultQueueConfig, concurrency: 10,
strictPriority: false, queues: defaultQueueConfig,
errHandler: nil, strictPriority: false,
shutdownTimeout: defaultShutdownTimeout, errHandler: nil,
starting: starting, shutdownTimeout: defaultShutdownTimeout,
finished: finished, starting: starting,
finished: finished,
}) })
p.handler = h p.handler = h
return p return p
@ -539,20 +540,21 @@ func TestProcessorWithExpiredLease(t *testing.T) {
}() }()
go fakeSyncer(syncCh, done) go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
baseCtxFn: context.Background, baseCtxFn: context.Background,
retryDelayFunc: DefaultRetryDelayFunc, taskCheckInterval: defaultTaskCheckInterval,
isFailureFunc: defaultIsFailureFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, isFailureFunc: defaultIsFailureFunc,
cancelations: base.NewCancelations(), syncCh: syncCh,
concurrency: 10, cancelations: base.NewCancelations(),
queues: defaultQueueConfig, concurrency: 10,
strictPriority: false, queues: defaultQueueConfig,
errHandler: nil, strictPriority: false,
shutdownTimeout: defaultShutdownTimeout, errHandler: nil,
starting: starting, shutdownTimeout: defaultShutdownTimeout,
finished: finished, starting: starting,
finished: finished,
}) })
p.handler = tc.handler p.handler = tc.handler
var ( var (
@ -693,20 +695,21 @@ func TestProcessorWithStrictPriority(t *testing.T) {
go fakeHeartbeater(starting, finished, done) go fakeHeartbeater(starting, finished, done)
go fakeSyncer(syncCh, done) go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
baseCtxFn: context.Background, baseCtxFn: context.Background,
retryDelayFunc: DefaultRetryDelayFunc, taskCheckInterval: defaultTaskCheckInterval,
isFailureFunc: defaultIsFailureFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, isFailureFunc: defaultIsFailureFunc,
cancelations: base.NewCancelations(), syncCh: syncCh,
concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. cancelations: base.NewCancelations(),
queues: queueCfg, concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time.
strictPriority: true, queues: queueCfg,
errHandler: nil, strictPriority: true,
shutdownTimeout: defaultShutdownTimeout, errHandler: nil,
starting: starting, shutdownTimeout: defaultShutdownTimeout,
finished: finished, starting: starting,
finished: finished,
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)

View File

@ -15,10 +15,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
) )
// Server is responsible for task processing and task lifecycle management. // Server is responsible for task processing and task lifecycle management.
@ -104,6 +104,15 @@ type Config struct {
// If this is defined, then it MUST return a non-nil context // If this is defined, then it MUST return a non-nil context
BaseContext func() context.Context BaseContext func() context.Context
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
//
// Be careful not to set this value too low because it adds significant load to redis.
//
// If set to a zero or negative value, NewServer will overwrite the value with default value.
//
// By default, TaskCheckInterval is set to 1 seconds.
TaskCheckInterval time.Duration
// Function to calculate retry delay for a failed task. // Function to calculate retry delay for a failed task.
// //
// By default, it uses exponential backoff algorithm to calculate the delay. // By default, it uses exponential backoff algorithm to calculate the delay.
@ -390,6 +399,8 @@ var defaultQueueConfig = map[string]int{
} }
const ( const (
defaultTaskCheckInterval = 1 * time.Second
defaultShutdownTimeout = 8 * time.Second defaultShutdownTimeout = 8 * time.Second
defaultHealthCheckInterval = 15 * time.Second defaultHealthCheckInterval = 15 * time.Second
@ -414,6 +425,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if n < 1 { if n < 1 {
n = runtime.NumCPU() n = runtime.NumCPU()
} }
taskCheckInterval := cfg.TaskCheckInterval
if taskCheckInterval <= 0 {
taskCheckInterval = defaultTaskCheckInterval
}
delayFunc := cfg.RetryDelayFunc delayFunc := cfg.RetryDelayFunc
if delayFunc == nil { if delayFunc == nil {
delayFunc = DefaultRetryDelayFunc delayFunc = DefaultRetryDelayFunc
@ -500,20 +517,21 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
cancelations: cancels, cancelations: cancels,
}) })
processor := newProcessor(processorParams{ processor := newProcessor(processorParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
baseCtxFn: baseCtxFn, taskCheckInterval: taskCheckInterval,
isFailureFunc: isFailureFunc, baseCtxFn: baseCtxFn,
syncCh: syncCh, isFailureFunc: isFailureFunc,
cancelations: cancels, syncCh: syncCh,
concurrency: n, cancelations: cancels,
queues: queues, concurrency: n,
strictPriority: cfg.StrictPriority, queues: queues,
errHandler: cfg.ErrorHandler, strictPriority: cfg.StrictPriority,
shutdownTimeout: shutdownTimeout, errHandler: cfg.ErrorHandler,
starting: starting, shutdownTimeout: shutdownTimeout,
finished: finished, starting: starting,
finished: finished,
}) })
recoverer := newRecoverer(recovererParams{ recoverer := newRecoverer(recovererParams{
logger: logger, logger: logger,