diff --git a/asynq.go b/asynq.go index 647d8d4..a7821cf 100644 --- a/asynq.go +++ b/asynq.go @@ -12,12 +12,8 @@ TODOs: - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment - [P0] Redis Sentinel support - [P1] Add Support for multiple queues and priority -- [P1] User defined max-retry count */ -// Max retry count by default -const defaultMaxRetry = 25 - // Task represents a task to be performed. type Task struct { // Type indicates the kind of the task to be performed. diff --git a/asynq_test.go b/asynq_test.go index 0547eef..b28f9eb 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -9,6 +9,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hibiken/asynq/internal/rdb" "github.com/rs/xid" ) @@ -30,6 +31,12 @@ const ( inProgressQ = "asynq:in_progress" // LIST ) +// scheduledEntry represents an item in redis sorted set (aka ZSET). +type sortedSetEntry struct { + msg *rdb.TaskMessage + score int64 +} + func setup(t *testing.T) *redis.Client { t.Helper() r := redis.NewClient(&redis.Options{ @@ -59,6 +66,16 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.T return out }) +var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { + out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].msg.ID.String() < out[j].msg.ID.String() + }) + return out +}) + +var ignoreIDOpt = cmpopts.IgnoreFields(rdb.TaskMessage{}, "ID") + func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { return &rdb.TaskMessage{ ID: xid.New(), diff --git a/client.go b/client.go index b90c33f..cb12b1f 100644 --- a/client.go +++ b/client.go @@ -3,8 +3,8 @@ package asynq import ( "time" - "github.com/rs/xid" "github.com/hibiken/asynq/internal/rdb" + "github.com/rs/xid" ) // A Client is responsible for scheduling tasks. @@ -23,17 +23,62 @@ func NewClient(cfg *RedisConfig) *Client { return &Client{r} } +// Option configures the behavior of task processing. +type Option interface{} + +// max number of times a task will be retried. +type retryOption int + +// MaxRetry returns an option to specify the max number of times +// a task will be retried. +// +// Negative retry count is treated as zero retry. +func MaxRetry(n int) Option { + if n < 0 { + n = 0 + } + return retryOption(n) +} + +type option struct { + retry int +} + +func composeOptions(opts ...Option) option { + res := option{ + retry: defaultMaxRetry, + } + for _, opt := range opts { + switch opt := opt.(type) { + case retryOption: + res.retry = int(opt) + default: + // ignore unexpected option + } + } + return res +} + +const ( + // Max retry count by default + defaultMaxRetry = 25 +) + // Process registers a task to be processed at the specified time. // // Process returns nil if the task is registered successfully, // otherwise returns non-nil error. -func (c *Client) Process(task *Task, processAt time.Time) error { +// +// opts specifies the behavior of task processing. If there are conflicting +// Option the last one overrides the ones before. +func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error { + opt := composeOptions(opts...) msg := &rdb.TaskMessage{ ID: xid.New(), Type: task.Type, Payload: task.Payload, Queue: "default", - Retry: defaultMaxRetry, + Retry: opt.retry, } return c.enqueue(msg, processAt) } diff --git a/client_test.go b/client_test.go index f31e10d..b71ece6 100644 --- a/client_test.go +++ b/client_test.go @@ -1,32 +1,111 @@ package asynq import ( - "github.com/hibiken/asynq/internal/rdb" "testing" "time" + + "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/rdb" ) func TestClient(t *testing.T) { r := setup(t) client := &Client{rdb.NewRDB(r)} + task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}} + tests := []struct { - task *Task - processAt time.Time - wantQueueSize int64 - wantScheduledSize int64 + desc string + task *Task + processAt time.Time + opts []Option + wantEnqueued []*rdb.TaskMessage + wantScheduled []sortedSetEntry }{ { - task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, - processAt: time.Now(), - wantQueueSize: 1, - wantScheduledSize: 0, + desc: "Process task immediately", + task: task, + processAt: time.Now(), + opts: []Option{}, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil }, { - task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, - processAt: time.Now().Add(2 * time.Hour), - wantQueueSize: 0, - wantScheduledSize: 1, + desc: "Schedule task to be processed in the future", + task: task, + processAt: time.Now().Add(2 * time.Hour), + opts: []Option{}, + wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil + wantScheduled: []sortedSetEntry{ + { + msg: &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Queue: "default", + }, + score: time.Now().Add(2 * time.Hour).Unix(), + }, + }, + }, + { + desc: "Process task immediately with a custom retry count", + task: task, + processAt: time.Now(), + opts: []Option{ + MaxRetry(3), + }, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: 3, + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "Negative retry count", + task: task, + processAt: time.Now(), + opts: []Option{ + MaxRetry(-2), + }, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: 0, // Retry count should be set to zero + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "Conflicting options", + task: task, + processAt: time.Now(), + opts: []Option{ + MaxRetry(2), + MaxRetry(10), + }, + wantEnqueued: []*rdb.TaskMessage{ + &rdb.TaskMessage{ + Type: task.Type, + Payload: task.Payload, + Retry: 10, // Last option takes precedence + Queue: "default", + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil }, } @@ -36,18 +115,30 @@ func TestClient(t *testing.T) { t.Fatal(err) } - err := client.Process(tc.task, tc.processAt) + err := client.Process(tc.task, tc.processAt, tc.opts...) if err != nil { t.Error(err) continue } - if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize { - t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize) + gotEnqueuedRaw := r.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, defaultQ, diff) } - if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize { - t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize) + gotScheduledRaw := r.ZRangeWithScores(scheduledQ, 0, -1).Val() + var gotScheduled []sortedSetEntry + for _, z := range gotScheduledRaw { + gotScheduled = append(gotScheduled, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, scheduledQ, diff) } } } diff --git a/tools/asynqmon/cmd/root.go b/tools/asynqmon/cmd/root.go index 5bf4df4..c3ce463 100644 --- a/tools/asynqmon/cmd/root.go +++ b/tools/asynqmon/cmd/root.go @@ -21,7 +21,7 @@ var rootCmd = &cobra.Command{ Short: "A monitoring tool for asynq queues", Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package. -Asynqmon has a few subcommands to query and mutate the current state of the queues. +Asynqmon has a few commands to query and mutate the current state of the queues. Monitoring commands such as "stats" and "ls" can be used in conjunction with the "watch" command to continuously run the command at a certain interval.