From ab8a4f5b1ee8a8c4c25fd72f540ba80c3f148281 Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Fri, 28 Jan 2022 21:21:34 +0530 Subject: [PATCH] review corrections --- internal/context/context.go | 4 +--- processor.go | 10 +++++----- server.go | 10 +++++++--- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/context/context.go b/internal/context/context.go index 8f10c03..588ea89 100644 --- a/internal/context/context.go +++ b/internal/context/context.go @@ -11,8 +11,6 @@ import ( "github.com/hibiken/asynq/internal/base" ) -type BaseContext func() context.Context - // A taskMetadata holds task scoped data to put in context. type taskMetadata struct { id string @@ -37,7 +35,7 @@ func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (conte retryCount: msg.Retried, qname: msg.Queue, } - ctx := context.WithValue(context.Background(), metadataCtxKey, metadata) + ctx := context.WithValue(base, metadataCtxKey, metadata) return context.WithDeadline(ctx, deadline) } diff --git a/processor.go b/processor.go index a1aff93..b7d7824 100644 --- a/processor.go +++ b/processor.go @@ -26,8 +26,8 @@ type processor struct { logger *log.Logger broker base.Broker - handler Handler - baseContextFn asynqcontext.BaseContext + handler Handler + baseCtxFn BaseCtxFn queueConfig map[string]int @@ -72,7 +72,7 @@ type processor struct { type processorParams struct { logger *log.Logger broker base.Broker - baseCtxFn asynqcontext.BaseContext + baseCtxFn BaseCtxFn retryDelayFunc RetryDelayFunc isFailureFunc func(error) bool syncCh chan<- *syncRequest @@ -96,7 +96,7 @@ func newProcessor(params processorParams) *processor { return &processor{ logger: params.logger, broker: params.broker, - baseContextFn: params.baseCtxFn, + baseCtxFn: params.baseCtxFn, queueConfig: queues, orderedQueues: orderedQueues, retryDelayFunc: params.retryDelayFunc, @@ -193,7 +193,7 @@ func (p *processor) exec() { <-p.sema // release token }() - ctx, cancel := asynqcontext.New(p.baseContextFn(), msg, deadline) + ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline) p.cancelations.Add(msg.ID, cancel) defer func() { cancel() diff --git a/server.go b/server.go index 2a8930a..ca0b821 100644 --- a/server.go +++ b/server.go @@ -97,10 +97,11 @@ type Config struct { // to the number of CPUs usable by the current process. Concurrency int - // BaseContext optionally specifies a function that returns - // the base context for invocations on this server. + // BaseContext optionally specifies a function that returns the base context for Handler invocations on this server. + // // If BaseContext is nil, the default is context.Background(). - BaseContext func() context.Context + // If this is defined, then it MUST return a non-nil context + BaseContext BaseCtxFn // SleepOnEmptyQueue optionally specifies the amount of time to wait before polling again when there are no messages in the queue SleepOnEmptyQueue time.Duration @@ -211,6 +212,9 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro fn(ctx, task, err) } +// BaseCtxFn provides the root context from where the execution contexts of tasks are derived +type BaseCtxFn func() context.Context + // RetryDelayFunc calculates the retry delay duration for a failed task given // the retry count, error, and the task. //