2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Export DefaultRetryDelayFunc

This commit is contained in:
Ken Hibino 2021-01-12 11:40:26 -08:00
parent 7c3ad9e45c
commit ccb682853e
5 changed files with 25 additions and 20 deletions

View File

@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- `DefaultRetryDelayFunc` is now a public API, which can be used in the custom `RetryDelayFunc`.
- `SkipRetry` error is added to be used as a return value from `Handler`. - `SkipRetry` error is added to be used as a return value from `Handler`.
- `Servers` method is added to `Inspector` - `Servers` method is added to `Inspector`
- `CancelActiveTask` method is added to `Inspector`. - `CancelActiveTask` method is added to `Inspector`.

View File

@ -33,7 +33,7 @@ type processor struct {
// orderedQueues is set only in strict-priority mode. // orderedQueues is set only in strict-priority mode.
orderedQueues []string orderedQueues []string
retryDelayFunc retryDelayFunc retryDelayFunc RetryDelayFunc
errHandler ErrorHandler errHandler ErrorHandler
@ -67,12 +67,10 @@ type processor struct {
finished chan<- *base.TaskMessage finished chan<- *base.TaskMessage
} }
type retryDelayFunc func(n int, err error, task *Task) time.Duration
type processorParams struct { type processorParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
retryDelayFunc retryDelayFunc retryDelayFunc RetryDelayFunc
syncCh chan<- *syncRequest syncCh chan<- *syncRequest
cancelations *base.Cancelations cancelations *base.Cancelations
concurrency int concurrency int

View File

@ -96,7 +96,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, syncCh: syncCh,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 10, concurrency: 10,
@ -187,7 +187,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, syncCh: syncCh,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 10, concurrency: 10,
@ -268,7 +268,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, syncCh: syncCh,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 10, concurrency: 10,
@ -478,7 +478,7 @@ func TestProcessorQueues(t *testing.T) {
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: nil, broker: nil,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 10, concurrency: 10,
@ -569,7 +569,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: DefaultRetryDelayFunc,
syncCh: syncCh, syncCh: syncCh,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time.

View File

@ -16,7 +16,7 @@ import (
type recoverer struct { type recoverer struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
retryDelayFunc retryDelayFunc retryDelayFunc RetryDelayFunc
// channel to communicate back to the long running "recoverer" goroutine. // channel to communicate back to the long running "recoverer" goroutine.
done chan struct{} done chan struct{}
@ -33,7 +33,7 @@ type recovererParams struct {
broker base.Broker broker base.Broker
queues []string queues []string
interval time.Duration interval time.Duration
retryDelayFunc retryDelayFunc retryDelayFunc RetryDelayFunc
} }
func newRecoverer(params recovererParams) *recoverer { func newRecoverer(params recovererParams) *recoverer {

View File

@ -60,11 +60,7 @@ type Config struct {
// Function to calculate retry delay for a failed task. // Function to calculate retry delay for a failed task.
// //
// By default, it uses exponential backoff algorithm to calculate the delay. // By default, it uses exponential backoff algorithm to calculate the delay.
// RetryDelayFunc RetryDelayFunc
// n is the number of times the task has been retried.
// e is the error returned by the task handler.
// t is the task in question.
RetryDelayFunc func(n int, e error, t *Task) time.Duration
// List of queues to process with given priority value. Keys are the names of the // List of queues to process with given priority value. Keys are the names of the
// queues and values are associated priority value. // queues and values are associated priority value.
@ -153,6 +149,14 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err) fn(ctx, task, err)
} }
// RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task.
//
// n is the number of times the task has been retried.
// e is the error returned by the task handler.
// t is the task in question.
type RetryDelayFunc func(n int, e error, t *Task) time.Duration
// Logger supports logging at various log levels. // Logger supports logging at various log levels.
type Logger interface { type Logger interface {
// Debug logs a message at Debug level. // Debug logs a message at Debug level.
@ -253,9 +257,11 @@ func toInternalLogLevel(l LogLevel) log.Level {
panic(fmt.Sprintf("asynq: unexpected log level: %v", l)) panic(fmt.Sprintf("asynq: unexpected log level: %v", l))
} }
// Formula taken from https://github.com/mperham/sidekiq. // DefaultRetryDelayFunc is the default RetryDelayFunc used if one is not specified in Config.
func defaultDelayFunc(n int, e error, t *Task) time.Duration { // It uses exponential back-off strategy to calculate the retry delay.
func DefaultRetryDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
// Formula taken from https://github.com/mperham/sidekiq.
s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1)) s := int(math.Pow(float64(n), 4)) + 15 + (r.Intn(30) * (n + 1))
return time.Duration(s) * time.Second return time.Duration(s) * time.Second
} }
@ -279,7 +285,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
} }
delayFunc := cfg.RetryDelayFunc delayFunc := cfg.RetryDelayFunc
if delayFunc == nil { if delayFunc == nil {
delayFunc = defaultDelayFunc delayFunc = DefaultRetryDelayFunc
} }
queues := make(map[string]int) queues := make(map[string]int)
for qname, p := range cfg.Queues { for qname, p := range cfg.Queues {
@ -291,7 +297,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
queues = defaultQueueConfig queues = defaultQueueConfig
} }
var qnames []string var qnames []string
for q, _ := range queues { for q := range queues {
qnames = append(qnames, q) qnames = append(qnames, q)
} }
shutdownTimeout := cfg.ShutdownTimeout shutdownTimeout := cfg.ShutdownTimeout