diff --git a/processor.go b/processor.go index 7ce8766..0de9301 100644 --- a/processor.go +++ b/processor.go @@ -34,6 +34,8 @@ type processor struct { errHandler ErrorHandler + shutdownTimeout time.Duration + // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest @@ -61,30 +63,40 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration +type newProcessorParams struct { + logger Logger + rdb *rdb.RDB + ss *base.ServerState + retryDelayFunc retryDelayFunc + syncCh chan<- *syncRequest + cancelations *base.Cancelations + errHandler ErrorHandler + shutdownTimeout time.Duration +} + // newProcessor constructs a new processor. -func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc, - syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { - info := ss.GetInfo() +func newProcessor(params newProcessorParams) *processor { + info := params.ss.GetInfo() qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) if info.StrictPriority { orderedQueues = sortByPriority(qcfg) } return &processor{ - logger: l, - rdb: r, - ss: ss, + logger: params.logger, + rdb: params.rdb, + ss: params.ss, queueConfig: qcfg, orderedQueues: orderedQueues, - retryDelayFunc: fn, - syncRequestCh: syncCh, - cancelations: c, + retryDelayFunc: params.retryDelayFunc, + syncRequestCh: params.syncCh, + cancelations: params.cancelations, errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), sema: make(chan struct{}, info.Concurrency), done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), - errHandler: errHandler, + errHandler: params.errHandler, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), } } @@ -106,9 +118,7 @@ func (p *processor) stop() { func (p *processor) terminate() { p.stop() - // IDEA: Allow user to customize this timeout value. - const timeout = 8 * time.Second - time.AfterFunc(timeout, func() { close(p.quit) }) + time.AfterFunc(p.shutdownTimeout, func() { close(p.quit) }) p.logger.Info("Waiting for all workers to finish...") // send cancellation signal to all in-progress task handlers diff --git a/processor_test.go b/processor_test.go index 99c39a8..6ba48df 100644 --- a/processor_test.go +++ b/processor_test.go @@ -69,7 +69,16 @@ func TestProcessorSuccess(t *testing.T) { } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(newProcessorParams{ + logger: testLogger, + rdb: rdbClient, + ss: ss, + retryDelayFunc: defaultDelayFunc, + syncCh: nil, + cancelations: cancelations, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + }) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -167,7 +176,16 @@ func TestProcessorRetry(t *testing.T) { } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(testLogger, rdbClient, ss, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) + p := newProcessor(newProcessorParams{ + logger: testLogger, + rdb: rdbClient, + ss: ss, + retryDelayFunc: delayFunc, + syncCh: nil, + cancelations: cancelations, + errHandler: ErrorHandlerFunc(errHandler), + shutdownTimeout: defaultShutdownTimeout, + }) p.handler = tc.handler var wg sync.WaitGroup @@ -233,7 +251,16 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { cancelations := base.NewCancelations() ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) - p := newProcessor(testLogger, nil, ss, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(newProcessorParams{ + logger: testLogger, + rdb: nil, + ss: ss, + retryDelayFunc: defaultDelayFunc, + syncCh: nil, + cancelations: cancelations, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + }) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -301,7 +328,16 @@ func TestProcessorWithStrictPriority(t *testing.T) { // Note: Set concurrency to 1 to make sure tasks are processed one at a time. cancelations := base.NewCancelations() ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) - p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(newProcessorParams{ + logger: testLogger, + rdb: rdbClient, + ss: ss, + retryDelayFunc: defaultDelayFunc, + syncCh: nil, + cancelations: cancelations, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + }) p.handler = HandlerFunc(handler) var wg sync.WaitGroup diff --git a/server.go b/server.go index 2d6d181..8f5a8ed 100644 --- a/server.go +++ b/server.go @@ -109,6 +109,12 @@ type Config struct { // // If unset, default logger is used. Logger Logger + + // ShutdownTimeout specifies the duration to wait to let workers finish their tasks + // before forcing them to abort when stopping the server. + // + // If unset or zero, default timeout of 8 seconds is used. + ShutdownTimeout time.Duration } // An ErrorHandler handles errors returned by the task handler. @@ -155,6 +161,8 @@ var defaultQueueConfig = map[string]int{ base.DefaultQueueName: 1, } +const defaultShutdownTimeout = 8 * time.Second + // NewServer returns a new Server given a redis connection option // and background processing configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { @@ -179,6 +187,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if logger == nil { logger = log.NewLogger(os.Stderr) } + shutdownTimeout := cfg.ShutdownTimeout + if shutdownTimeout == 0 { + shutdownTimeout = defaultShutdownTimeout + } host, err := os.Hostname() if err != nil { @@ -193,8 +205,17 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { syncer := newSyncer(logger, syncCh, 5*time.Second) heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second) scheduler := newScheduler(logger, rdb, 5*time.Second, queues) - processor := newProcessor(logger, rdb, ss, delayFunc, syncCh, cancels, cfg.ErrorHandler) subscriber := newSubscriber(logger, rdb, cancels) + processor := newProcessor(newProcessorParams{ + logger: logger, + rdb: rdb, + ss: ss, + retryDelayFunc: delayFunc, + syncCh: syncCh, + cancelations: cancels, + errHandler: cfg.ErrorHandler, + shutdownTimeout: shutdownTimeout, + }) return &Server{ ss: ss, logger: logger, @@ -287,9 +308,8 @@ func (srv *Server) Start(handler Handler) error { // Stop stops the worker server. // It gracefully closes all active workers. The server will wait for -// active workers to finish processing task for 8 seconds(TODO: Add ShutdownTimeout to Config). -// If worker didn't finish processing a task during the timeout, the -// task will be pushed back to Redis. +// active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. +// If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis. func (srv *Server) Stop() { switch srv.ss.Status() { case base.StatusIdle, base.StatusStopped: