diff --git a/internal/context/context.go b/internal/context/context.go index 47257d6..8f10c03 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -11,6 +11,8 @@ import ( "github.com/hibiken/asynq/internal/base" ) +type BaseContext func() context.Context + // A taskMetadata holds task scoped data to put in context. type taskMetadata struct { id string @@ -28,7 +30,7 @@ type ctxKey int const metadataCtxKey ctxKey = 0 // New returns a context and cancel function for a given task message. -func New(msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { +func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { metadata := taskMetadata{ id: msg.ID, maxRetry: msg.Retry, diff --git a/processor.go b/processor.go index 478633b..a1aff93 100644 --- a/processor.go +++ b/processor.go @@ -26,7 +26,8 @@ type processor struct { logger *log.Logger broker base.Broker - handler Handler + handler Handler + baseContextFn asynqcontext.BaseContext queueConfig map[string]int @@ -71,6 +72,7 @@ type processor struct { type processorParams struct { logger *log.Logger broker base.Broker + baseCtxFn asynqcontext.BaseContext retryDelayFunc RetryDelayFunc isFailureFunc func(error) bool syncCh chan<- *syncRequest @@ -94,6 +96,7 @@ func newProcessor(params processorParams) *processor { return &processor{ logger: params.logger, broker: params.broker, + baseContextFn: params.baseCtxFn, queueConfig: queues, orderedQueues: orderedQueues, retryDelayFunc: params.retryDelayFunc, @@ -190,7 +193,7 @@ func (p *processor) exec() { <-p.sema // release token }() - ctx, cancel := asynqcontext.New(msg, deadline) + ctx, cancel := asynqcontext.New(p.baseContextFn(), msg, deadline) p.cancelations.Add(msg.ID, cancel) defer func() { cancel() diff --git a/server.go b/server.go index 1cf5bed..2a8930a 100644 --- a/server.go +++ b/server.go @@ -97,6 +97,14 @@ type Config struct { // to the number of CPUs usable by the current process. Concurrency int + // BaseContext optionally specifies a function that returns + // the base context for invocations on this server. + // If BaseContext is nil, the default is context.Background(). + BaseContext func() context.Context + + // SleepOnEmptyQueue optionally specifies the amount of time to wait before polling again when there are no messages in the queue + SleepOnEmptyQueue time.Duration + // Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. @@ -341,6 +349,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } + baseCtxFn := cfg.BaseContext + if baseCtxFn == nil { + baseCtxFn = context.Background + } n := cfg.Concurrency if n < 1 { n = runtime.NumCPU() @@ -426,6 +438,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger: logger, broker: rdb, retryDelayFunc: delayFunc, + baseCtxFn: baseCtxFn, isFailureFunc: isFailureFunc, syncCh: syncCh, cancelations: cancels,