diff --git a/CHANGELOG.md b/CHANGELOG.md index aeaf4c7..1dc7424 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed processor to wait for specified time duration before forcefully shutdown workers. + + ## [0.13.0] - 2020-10-13 ### Added diff --git a/processor.go b/processor.go index faf509d..3969051 100644 --- a/processor.go +++ b/processor.go @@ -88,22 +88,23 @@ func newProcessor(params processorParams) *processor { orderedQueues = sortByPriority(queues) } return &processor{ - logger: params.logger, - broker: params.broker, - queueConfig: queues, - orderedQueues: orderedQueues, - retryDelayFunc: params.retryDelayFunc, - syncRequestCh: params.syncCh, - cancelations: params.cancelations, - errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), - sema: make(chan struct{}, params.concurrency), - done: make(chan struct{}), - quit: make(chan struct{}), - abort: make(chan struct{}), - errHandler: params.errHandler, - handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), - starting: params.starting, - finished: params.finished, + logger: params.logger, + broker: params.broker, + queueConfig: queues, + orderedQueues: orderedQueues, + retryDelayFunc: params.retryDelayFunc, + syncRequestCh: params.syncCh, + cancelations: params.cancelations, + errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), + sema: make(chan struct{}, params.concurrency), + done: make(chan struct{}), + quit: make(chan struct{}), + abort: make(chan struct{}), + errHandler: params.errHandler, + handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), + shutdownTimeout: params.shutdownTimeout, + starting: params.starting, + finished: params.finished, } }