2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Allow user to configure shutdown timeout

This commit is contained in:
Ken Hibino 2020-04-15 07:15:01 -07:00
parent c688b8f4f9
commit 4df372b369
3 changed files with 87 additions and 21 deletions

View File

@ -34,6 +34,8 @@ type processor struct {
errHandler ErrorHandler errHandler ErrorHandler
shutdownTimeout time.Duration
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
@ -61,30 +63,40 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration type retryDelayFunc func(n int, err error, task *Task) time.Duration
type newProcessorParams struct {
logger Logger
rdb *rdb.RDB
ss *base.ServerState
retryDelayFunc retryDelayFunc
syncCh chan<- *syncRequest
cancelations *base.Cancelations
errHandler ErrorHandler
shutdownTimeout time.Duration
}
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc, func newProcessor(params newProcessorParams) *processor {
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { info := params.ss.GetInfo()
info := ss.GetInfo()
qcfg := normalizeQueueCfg(info.Queues) qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil) orderedQueues := []string(nil)
if info.StrictPriority { if info.StrictPriority {
orderedQueues = sortByPriority(qcfg) orderedQueues = sortByPriority(qcfg)
} }
return &processor{ return &processor{
logger: l, logger: params.logger,
rdb: r, rdb: params.rdb,
ss: ss, ss: params.ss,
queueConfig: qcfg, queueConfig: qcfg,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: fn, retryDelayFunc: params.retryDelayFunc,
syncRequestCh: syncCh, syncRequestCh: params.syncCh,
cancelations: c, cancelations: params.cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, info.Concurrency), sema: make(chan struct{}, info.Concurrency),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
errHandler: errHandler, errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
} }
} }
@ -106,9 +118,7 @@ func (p *processor) stop() {
func (p *processor) terminate() { func (p *processor) terminate() {
p.stop() p.stop()
// IDEA: Allow user to customize this timeout value. time.AfterFunc(p.shutdownTimeout, func() { close(p.quit) })
const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) })
p.logger.Info("Waiting for all workers to finish...") p.logger.Info("Waiting for all workers to finish...")
// send cancellation signal to all in-progress task handlers // send cancellation signal to all in-progress task handlers

View File

@ -69,7 +69,16 @@ func TestProcessorSuccess(t *testing.T) {
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) p := newProcessor(newProcessorParams{
logger: testLogger,
rdb: rdbClient,
ss: ss,
retryDelayFunc: defaultDelayFunc,
syncCh: nil,
cancelations: cancelations,
errHandler: nil,
shutdownTimeout: defaultShutdownTimeout,
})
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -167,7 +176,16 @@ func TestProcessorRetry(t *testing.T) {
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(testLogger, rdbClient, ss, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) p := newProcessor(newProcessorParams{
logger: testLogger,
rdb: rdbClient,
ss: ss,
retryDelayFunc: delayFunc,
syncCh: nil,
cancelations: cancelations,
errHandler: ErrorHandlerFunc(errHandler),
shutdownTimeout: defaultShutdownTimeout,
})
p.handler = tc.handler p.handler = tc.handler
var wg sync.WaitGroup var wg sync.WaitGroup
@ -233,7 +251,16 @@ func TestProcessorQueues(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(testLogger, nil, ss, defaultDelayFunc, nil, cancelations, nil) p := newProcessor(newProcessorParams{
logger: testLogger,
rdb: nil,
ss: ss,
retryDelayFunc: defaultDelayFunc,
syncCh: nil,
cancelations: cancelations,
errHandler: nil,
shutdownTimeout: defaultShutdownTimeout,
})
got := p.queues() got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -301,7 +328,16 @@ func TestProcessorWithStrictPriority(t *testing.T) {
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) p := newProcessor(newProcessorParams{
logger: testLogger,
rdb: rdbClient,
ss: ss,
retryDelayFunc: defaultDelayFunc,
syncCh: nil,
cancelations: cancelations,
errHandler: nil,
shutdownTimeout: defaultShutdownTimeout,
})
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -109,6 +109,12 @@ type Config struct {
// //
// If unset, default logger is used. // If unset, default logger is used.
Logger Logger Logger Logger
// ShutdownTimeout specifies the duration to wait to let workers finish their tasks
// before forcing them to abort when stopping the server.
//
// If unset or zero, default timeout of 8 seconds is used.
ShutdownTimeout time.Duration
} }
// An ErrorHandler handles errors returned by the task handler. // An ErrorHandler handles errors returned by the task handler.
@ -155,6 +161,8 @@ var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1, base.DefaultQueueName: 1,
} }
const defaultShutdownTimeout = 8 * time.Second
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
// and background processing configuration. // and background processing configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server { func NewServer(r RedisConnOpt, cfg Config) *Server {
@ -179,6 +187,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if logger == nil { if logger == nil {
logger = log.NewLogger(os.Stderr) logger = log.NewLogger(os.Stderr)
} }
shutdownTimeout := cfg.ShutdownTimeout
if shutdownTimeout == 0 {
shutdownTimeout = defaultShutdownTimeout
}
host, err := os.Hostname() host, err := os.Hostname()
if err != nil { if err != nil {
@ -193,8 +205,17 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
syncer := newSyncer(logger, syncCh, 5*time.Second) syncer := newSyncer(logger, syncCh, 5*time.Second)
heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second) heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second)
scheduler := newScheduler(logger, rdb, 5*time.Second, queues) scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
processor := newProcessor(logger, rdb, ss, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(logger, rdb, cancels) subscriber := newSubscriber(logger, rdb, cancels)
processor := newProcessor(newProcessorParams{
logger: logger,
rdb: rdb,
ss: ss,
retryDelayFunc: delayFunc,
syncCh: syncCh,
cancelations: cancels,
errHandler: cfg.ErrorHandler,
shutdownTimeout: shutdownTimeout,
})
return &Server{ return &Server{
ss: ss, ss: ss,
logger: logger, logger: logger,
@ -287,9 +308,8 @@ func (srv *Server) Start(handler Handler) error {
// Stop stops the worker server. // Stop stops the worker server.
// It gracefully closes all active workers. The server will wait for // It gracefully closes all active workers. The server will wait for
// active workers to finish processing task for 8 seconds(TODO: Add ShutdownTimeout to Config). // active workers to finish processing tasks for duration specified in Config.ShutdownTimeout.
// If worker didn't finish processing a task during the timeout, the // If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
// task will be pushed back to Redis.
func (srv *Server) Stop() { func (srv *Server) Stop() {
switch srv.ss.Status() { switch srv.ss.Status() {
case base.StatusIdle, base.StatusStopped: case base.StatusIdle, base.StatusStopped: