2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

first cut

This commit is contained in:
Binaek Sarkar 2022-01-25 14:42:02 +05:30 committed by Ken Hibino
parent 8bd70c6f84
commit d7ceb0c090
3 changed files with 21 additions and 3 deletions

View File

@ -11,6 +11,8 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
type BaseContext func() context.Context
// A taskMetadata holds task scoped data to put in context. // A taskMetadata holds task scoped data to put in context.
type taskMetadata struct { type taskMetadata struct {
id string id string
@ -28,7 +30,7 @@ type ctxKey int
const metadataCtxKey ctxKey = 0 const metadataCtxKey ctxKey = 0
// New returns a context and cancel function for a given task message. // New returns a context and cancel function for a given task message.
func New(msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { func New(base context.Context, msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) {
metadata := taskMetadata{ metadata := taskMetadata{
id: msg.ID, id: msg.ID,
maxRetry: msg.Retry, maxRetry: msg.Retry,

View File

@ -26,7 +26,8 @@ type processor struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
handler Handler handler Handler
baseContextFn asynqcontext.BaseContext
queueConfig map[string]int queueConfig map[string]int
@ -71,6 +72,7 @@ type processor struct {
type processorParams struct { type processorParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
baseCtxFn asynqcontext.BaseContext
retryDelayFunc RetryDelayFunc retryDelayFunc RetryDelayFunc
isFailureFunc func(error) bool isFailureFunc func(error) bool
syncCh chan<- *syncRequest syncCh chan<- *syncRequest
@ -94,6 +96,7 @@ func newProcessor(params processorParams) *processor {
return &processor{ return &processor{
logger: params.logger, logger: params.logger,
broker: params.broker, broker: params.broker,
baseContextFn: params.baseCtxFn,
queueConfig: queues, queueConfig: queues,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc, retryDelayFunc: params.retryDelayFunc,
@ -190,7 +193,7 @@ func (p *processor) exec() {
<-p.sema // release token <-p.sema // release token
}() }()
ctx, cancel := asynqcontext.New(msg, deadline) ctx, cancel := asynqcontext.New(p.baseContextFn(), msg, deadline)
p.cancelations.Add(msg.ID, cancel) p.cancelations.Add(msg.ID, cancel)
defer func() { defer func() {
cancel() cancel()

View File

@ -97,6 +97,14 @@ type Config struct {
// to the number of CPUs usable by the current process. // to the number of CPUs usable by the current process.
Concurrency int Concurrency int
// BaseContext optionally specifies a function that returns
// the base context for invocations on this server.
// If BaseContext is nil, the default is context.Background().
BaseContext func() context.Context
// SleepOnEmptyQueue optionally specifies the amount of time to wait before polling again when there are no messages in the queue
SleepOnEmptyQueue time.Duration
// Function to calculate retry delay for a failed task. // Function to calculate retry delay for a failed task.
// //
// By default, it uses exponential backoff algorithm to calculate the delay. // By default, it uses exponential backoff algorithm to calculate the delay.
@ -341,6 +349,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if !ok { if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
} }
baseCtxFn := cfg.BaseContext
if baseCtxFn == nil {
baseCtxFn = context.Background
}
n := cfg.Concurrency n := cfg.Concurrency
if n < 1 { if n < 1 {
n = runtime.NumCPU() n = runtime.NumCPU()
@ -426,6 +438,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
logger: logger, logger: logger,
broker: rdb, broker: rdb,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
baseCtxFn: baseCtxFn,
isFailureFunc: isFailureFunc, isFailureFunc: isFailureFunc,
syncCh: syncCh, syncCh: syncCh,
cancelations: cancels, cancelations: cancels,