From 4229073a249f5b54a6acd9045bc371293c44a3ef Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 21 Dec 2019 07:42:32 -0800 Subject: [PATCH] Allow user to define a max retry count for a task --- asynq.go | 4 -- asynq_test.go | 17 ++++++++ client.go | 48 ++++++++++++++++++++-- client_test.go | 109 +++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 153 insertions(+), 25 deletions(-) diff --git a/asynq.go b/asynq.go index 647d8d4..a7821cf 100644 --- a/asynq.go +++ b/asynq.go @@ -12,12 +12,8 @@ TODOs: - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment - [P0] Redis Sentinel support - [P1] Add Support for multiple queues and priority -- [P1] User defined max-retry count */ -// Max retry count by default -const defaultMaxRetry = 25 - // Task represents a task to be performed. type Task struct { // Type indicates the kind of the task to be performed. diff --git a/asynq_test.go b/asynq_test.go index 0547eef..b28f9eb 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -9,6 +9,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hibiken/asynq/internal/rdb" "github.com/rs/xid" ) @@ -30,6 +31,12 @@ const ( inProgressQ = "asynq:in_progress" // LIST ) +// scheduledEntry represents an item in redis sorted set (aka ZSET). +type sortedSetEntry struct { + msg *rdb.TaskMessage + score int64 +} + func setup(t *testing.T) *redis.Client { t.Helper() r := redis.NewClient(&redis.Options{ @@ -59,6 +66,16 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.T return out }) +var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { + out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].msg.ID.String() < out[j].msg.ID.String() + }) + return out +}) + +var ignoreIDOpt = cmpopts.IgnoreFields(rdb.TaskMessage{}, "ID") + func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { return &rdb.TaskMessage{ ID: xid.New(), diff --git a/client.go b/client.go index b90c33f..96ef557 100644 --- a/client.go +++ b/client.go @@ -3,8 +3,8 @@ package asynq import ( "time" - "github.com/rs/xid" "github.com/hibiken/asynq/internal/rdb" + "github.com/rs/xid" ) // A Client is responsible for scheduling tasks. @@ -23,17 +23,59 @@ func NewClient(cfg *RedisConfig) *Client { return &Client{r} } +// Option configures the behavior of task processing. +type Option interface{} + +// max number of times a task will be retried. +type retryOption int + +// MaxRetry returns an option to specify the max number of times +// a task will be retried. +// +// Negative retry count is treated as zero retry. +func MaxRetry(n int) Option { + if n < 0 { + n = 0 + } + return retryOption(n) +} + +type option struct { + retry int +} + +func composeOptions(opts ...Option) option { + res := option{ + retry: defaultMaxRetry, + } + for _, opt := range opts { + switch opt := opt.(type) { + case retryOption: + res.retry = int(opt) + default: + // ignore unexpected option + } + } + return res +} + +const ( + // Max retry count by default + defaultMaxRetry = 25 +) + // Process registers a task to be processed at the specified time. // // Process returns nil if the task is registered successfully, // otherwise returns non-nil error. -func (c *Client) Process(task *Task, processAt time.Time) error { +func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error { + opt := composeOptions(opts...) msg := &rdb.TaskMessage{ ID: xid.New(), Type: task.Type, Payload: task.Payload, Queue: "default", - Retry: defaultMaxRetry, + Retry: opt.retry, } return c.enqueue(msg, processAt) } diff --git a/client_test.go b/client_test.go index f31e10d..0d838f7 100644 --- a/client_test.go +++ b/client_test.go @@ -1,32 +1,93 @@ package asynq import ( - "github.com/hibiken/asynq/internal/rdb" "testing" "time" + + "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/rdb" ) func TestClient(t *testing.T) { r := setup(t) client := &Client{rdb.NewRDB(r)} + task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}} + tests := []struct { - task *Task - processAt time.Time - wantQueueSize int64 - wantScheduledSize int64 + desc string + task *Task + processAt time.Time + opts []Option + wantEnqueued []*rdb.TaskMessage + wantScheduled []sortedSetEntry }{ { - task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, - processAt: time.Now(), - wantQueueSize: 1, - wantScheduledSize: 0, + desc: "Process task immediately", + task: task, + processAt: time.Now(), + opts: []Option{}, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil }, { - task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, - processAt: time.Now().Add(2 * time.Hour), - wantQueueSize: 0, - wantScheduledSize: 1, + desc: "Schedule task to be processed in the future", + task: task, + processAt: time.Now().Add(2 * time.Hour), + opts: []Option{}, + wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil + wantScheduled: []sortedSetEntry{ + { + msg: &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Queue: "default", + }, + score: time.Now().Add(2 * time.Hour).Unix(), + }, + }, + }, + { + desc: "Process task immediately with a custom retry count", + task: task, + processAt: time.Now(), + opts: []Option{ + MaxRetry(3), + }, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: 3, + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "Negative retry count", + task: task, + processAt: time.Now(), + opts: []Option{ + MaxRetry(-2), + }, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: 0, // Retry count should be set to zero + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil }, } @@ -36,18 +97,30 @@ func TestClient(t *testing.T) { t.Fatal(err) } - err := client.Process(tc.task, tc.processAt) + err := client.Process(tc.task, tc.processAt, tc.opts...) if err != nil { t.Error(err) continue } - if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize { - t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize) + gotEnqueuedRaw := r.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, defaultQ, diff) } - if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize { - t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize) + gotScheduledRaw := r.ZRangeWithScores(scheduledQ, 0, -1).Val() + var gotScheduled []sortedSetEntry + for _, z := range gotScheduledRaw { + gotScheduled = append(gotScheduled, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, scheduledQ, diff) } } }