From ccb682853ee956e42f04b5c7e45b8f039fa64667 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 12 Jan 2021 11:40:26 -0800 Subject: [PATCH] Export DefaultRetryDelayFunc --- CHANGELOG.md | 1 + processor.go | 6 ++---- processor_test.go | 10 +++++----- recoverer.go | 4 ++-- server.go | 24 +++++++++++++++--------- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3820240..e46b23e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `DefaultRetryDelayFunc` is now a public API, which can be used in the custom `RetryDelayFunc`. - `SkipRetry` error is added to be used as a return value from `Handler`. - `Servers` method is added to `Inspector` - `CancelActiveTask` method is added to `Inspector`. diff --git a/processor.go b/processor.go index 2cebfc5..5a949e6 100644 --- a/processor.go +++ b/processor.go @@ -33,7 +33,7 @@ type processor struct { // orderedQueues is set only in strict-priority mode. orderedQueues []string - retryDelayFunc retryDelayFunc + retryDelayFunc RetryDelayFunc errHandler ErrorHandler @@ -67,12 +67,10 @@ type processor struct { finished chan<- *base.TaskMessage } -type retryDelayFunc func(n int, err error, task *Task) time.Duration - type processorParams struct { logger *log.Logger broker base.Broker - retryDelayFunc retryDelayFunc + retryDelayFunc RetryDelayFunc syncCh chan<- *syncRequest cancelations *base.Cancelations concurrency int diff --git a/processor_test.go b/processor_test.go index 1b5a279..35a5e44 100644 --- a/processor_test.go +++ b/processor_test.go @@ -96,7 +96,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, - retryDelayFunc: defaultDelayFunc, + retryDelayFunc: DefaultRetryDelayFunc, syncCh: syncCh, cancelations: base.NewCancelations(), concurrency: 10, @@ -187,7 +187,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, - retryDelayFunc: defaultDelayFunc, + retryDelayFunc: DefaultRetryDelayFunc, syncCh: syncCh, cancelations: base.NewCancelations(), concurrency: 10, @@ -268,7 +268,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, - retryDelayFunc: defaultDelayFunc, + retryDelayFunc: DefaultRetryDelayFunc, syncCh: syncCh, cancelations: base.NewCancelations(), concurrency: 10, @@ -478,7 +478,7 @@ func TestProcessorQueues(t *testing.T) { p := newProcessor(processorParams{ logger: testLogger, broker: nil, - retryDelayFunc: defaultDelayFunc, + retryDelayFunc: DefaultRetryDelayFunc, syncCh: nil, cancelations: base.NewCancelations(), concurrency: 10, @@ -569,7 +569,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, - retryDelayFunc: defaultDelayFunc, + retryDelayFunc: DefaultRetryDelayFunc, syncCh: syncCh, cancelations: base.NewCancelations(), concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. diff --git a/recoverer.go b/recoverer.go index 43265b8..5a6da19 100644 --- a/recoverer.go +++ b/recoverer.go @@ -16,7 +16,7 @@ import ( type recoverer struct { logger *log.Logger broker base.Broker - retryDelayFunc retryDelayFunc + retryDelayFunc RetryDelayFunc // channel to communicate back to the long running "recoverer" goroutine. done chan struct{} @@ -33,7 +33,7 @@ type recovererParams struct { broker base.Broker queues []string interval time.Duration - retryDelayFunc retryDelayFunc + retryDelayFunc RetryDelayFunc } func newRecoverer(params recovererParams) *recoverer { diff --git a/server.go b/server.go index fad354f..2275b87 100644 --- a/server.go +++ b/server.go @@ -60,11 +60,7 @@ type Config struct { // Function to calculate retry delay for a failed task. // // By default, it uses exponential backoff algorithm to calculate the delay. - // - // n is the number of times the task has been retried. - // e is the error returned by the task handler. - // t is the task in question. - RetryDelayFunc func(n int, e error, t *Task) time.Duration + RetryDelayFunc RetryDelayFunc // List of queues to process with given priority value. Keys are the names of the // queues and values are associated priority value. @@ -153,6 +149,14 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro fn(ctx, task, err) } +// RetryDelayFunc calculates the retry delay duration for a failed task given +// the retry count, error, and the task. +// +// n is the number of times the task has been retried. +// e is the error returned by the task handler. +// t is the task in question. +type RetryDelayFunc func(n int, e error, t *Task) time.Duration + // Logger supports logging at various log levels. type Logger interface { // Debug logs a message at Debug level. @@ -253,9 +257,11 @@ func toInternalLogLevel(l LogLevel) log.Level { panic(fmt.Sprintf("asynq: unexpected log level: %v", l)) } -// Formula taken from https://github.com/mperham/sidekiq. -func defaultDelayFunc(n int, e error, t *Task) time.Duration { +// DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config. +// It uses exponential back-off strategy to calculate the retry delay. +func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) + // Formula taken from https://github.com/mperham/sidekiq. s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1)) return time.Duration(s) * time.Second } @@ -279,7 +285,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { } delayFunc := cfg.RetryDelayFunc if delayFunc == nil { - delayFunc = defaultDelayFunc + delayFunc = DefaultRetryDelayFunc } queues := make(map[string]int) for qname, p := range cfg.Queues { @@ -291,7 +297,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { queues = defaultQueueConfig } var qnames []string - for q, _ := range queues { + for q := range queues { qnames = append(qnames, q) } shutdownTimeout := cfg.ShutdownTimeout