2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

review corrections

This commit is contained in:
Binaek Sarkar 2022-01-28 21:21:34 +05:30 committed by Ken Hibino
parent d7ceb0c090
commit ab8a4f5b1e
3 changed files with 13 additions and 11 deletions

View File

@ -11,8 +11,6 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
type BaseContext func() context.Context
// A taskMetadata holds task scoped data to put in context. // A taskMetadata holds task scoped data to put in context.
type taskMetadata struct { type taskMetadata struct {
id string id string
@ -37,7 +35,7 @@ func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (conte
retryCount: msg.Retried, retryCount: msg.Retried,
qname: msg.Queue, qname: msg.Queue,
} }
ctx := context.WithValue(context.Background(), metadataCtxKey, metadata) ctx := context.WithValue(base, metadataCtxKey, metadata)
return context.WithDeadline(ctx, deadline) return context.WithDeadline(ctx, deadline)
} }

View File

@ -27,7 +27,7 @@ type processor struct {
broker base.Broker broker base.Broker
handler Handler handler Handler
baseContextFn asynqcontext.BaseContext baseCtxFn BaseCtxFn
queueConfig map[string]int queueConfig map[string]int
@ -72,7 +72,7 @@ type processor struct {
type processorParams struct { type processorParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
baseCtxFn asynqcontext.BaseContext baseCtxFn BaseCtxFn
retryDelayFunc RetryDelayFunc retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool isFailureFunc func(error) bool
syncCh chan<- *syncRequest syncCh chan<- *syncRequest
@ -96,7 +96,7 @@ func newProcessor(params processorParams) *processor {
return &processor{ return &processor{
logger: params.logger, logger: params.logger,
broker: params.broker, broker: params.broker,
baseContextFn: params.baseCtxFn, baseCtxFn: params.baseCtxFn,
queueConfig: queues, queueConfig: queues,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc, retryDelayFunc: params.retryDelayFunc,
@ -193,7 +193,7 @@ func (p *processor) exec() {
<-p.sema // release token <-p.sema // release token
}() }()
ctx, cancel := asynqcontext.New(p.baseContextFn(), msg, deadline) ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
p.cancelations.Add(msg.ID, cancel) p.cancelations.Add(msg.ID, cancel)
defer func() { defer func() {
cancel() cancel()

View File

@ -97,10 +97,11 @@ type Config struct {
// to the number of CPUs usable by the current process. // to the number of CPUs usable by the current process.
Concurrency int Concurrency int
// BaseContext optionally specifies a function that returns // BaseContext optionally specifies a function that returns the base context for Handler invocations on this server.
// the base context for invocations on this server. //
// If BaseContext is nil, the default is context.Background(). // If BaseContext is nil, the default is context.Background().
BaseContext func() context.Context // If this is defined, then it MUST return a non-nil context
BaseContext BaseCtxFn
// SleepOnEmptyQueue optionally specifies the amount of time to wait before polling again when there are no messages in the queue // SleepOnEmptyQueue optionally specifies the amount of time to wait before polling again when there are no messages in the queue
SleepOnEmptyQueue time.Duration SleepOnEmptyQueue time.Duration
@ -211,6 +212,9 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err) fn(ctx, task, err)
} }
// BaseCtxFn provides the root context from where the execution contexts of tasks are derived
type BaseCtxFn func() context.Context
// RetryDelayFunc calculates the retry delay duration for a failed task given // RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task. // the retry count, error, and the task.
// //