diff --git a/CHANGELOG.md b/CHANGELOG.md index 6066b2d..861e4a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `Client` API has changed. Use `Enqueue`, `EnqueueAt` and `EnqueueIn` to enqueue and schedule tasks. + ### Added - `asynqmon workers` was added to list all running workers information diff --git a/README.md b/README.md index a13be52..fb010f7 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ To create and schedule tasks, use `Client` and provide a task and when to proces ```go func main() { r := &asynq.RedisClientOpt{ - Addr: "localhost:6379", + Addr: "127.0.0.1:6379", } client := asynq.NewClient(r) @@ -46,19 +46,21 @@ func main() { t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42}) // Process immediately - err := client.Schedule(t1, time.Now()) + err := client.Enqueue(t1) // Process 24 hrs later - err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) + err = client.EnqueueIn(24*time.Hour, t2) - // If processing fails, retry up to 10 times (Default is 25) - err = client.Schedule(t1, time.Now(), asynq.Retry(10)) + // Process at specified time. + t := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC) + err = client.EnqueueAt(t, t2) - // Use custom queue called "critical" - err = client.Schedule(t1, time.Now(), asynq.Queue("critical")) - - // Use timeout to specify how long a task may run (Default is no limit) - err = client.Schedule(t1, time.Now(), asynq.Timeout(30 * time.Second)) + // Pass options to specify processing behavior for a given task. + // + // MaxRetry specifies the maximum number of times this task will be retried (Default is 25). + // Queue specifies which queue to enqueue this task to (Default is "default"). + // Timeout specifies the the timeout for the task's context (Default is no timeout). + err = client.Enqueue(t1, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute)) } ``` @@ -67,13 +69,13 @@ To start the background workers, use `Background` and provide your `Handler` to ```go func main() { r := &asynq.RedisClientOpt{ - Addr: "localhost:6379", + Addr: "127.0.0.1:6379", } bg := asynq.NewBackground(r, &asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, - // You can optionally create multiple queues with different priority. + // Optionally specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, "default": 3, diff --git a/background_test.go b/background_test.go index 32aaeb2..dc2ec03 100644 --- a/background_test.go +++ b/background_test.go @@ -34,9 +34,15 @@ func TestBackground(t *testing.T) { bg.start(HandlerFunc(h)) - client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 123}), time.Now()) + err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + if err != nil { + t.Errorf("could not enqueue a task: %v", err) + } - client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), time.Now().Add(time.Hour)) + err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) + if err != nil { + t.Errorf("could not enqueue a task: %v", err) + } bg.stop() } diff --git a/benchmark_test.go b/benchmark_test.go index 5988adb..469c048 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -33,7 +33,9 @@ func BenchmarkEndToEndSimple(b *testing.B) { // Create a bunch of tasks for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now()) + if err := client.Enqueue(t); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } var wg sync.WaitGroup @@ -74,11 +76,15 @@ func BenchmarkEndToEnd(b *testing.B) { // Create a bunch of tasks for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now()) + if err := client.Enqueue(t); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now().Add(time.Second)) + if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } var wg sync.WaitGroup @@ -129,15 +135,21 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { // Create a bunch of tasks for i := 0; i < highCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now(), Queue("high")) + if err := client.Enqueue(t, Queue("high")); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } for i := 0; i < defaultCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now()) + if err := client.Enqueue(t); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } for i := 0; i < lowCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - client.Schedule(t, time.Now(), Queue("low")) + if err := client.Enqueue(t, Queue("low")); err != nil { + b.Fatalf("could not enqueue a task: %v", err) + } } var wg sync.WaitGroup diff --git a/client.go b/client.go index b7116e4..2bd88ee 100644 --- a/client.go +++ b/client.go @@ -96,14 +96,13 @@ const ( defaultMaxRetry = 25 ) -// Schedule registers a task to be processed at the specified time. +// EnqueueAt schedules task to be enqueued at the specified time. // -// Schedule returns nil if the task is registered successfully, -// otherwise returns a non-nil error. +// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error. // -// opts specifies the behavior of task processing. If there are conflicting -// Option values the last one overrides others. -func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error { +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { opt := composeOptions(opts...) msg := &base.TaskMessage{ ID: xid.New(), @@ -113,12 +112,32 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error Retry: opt.retry, Timeout: opt.timeout.String(), } - return c.enqueue(msg, processAt) + return c.enqueue(msg, t) } -func (c *Client) enqueue(msg *base.TaskMessage, processAt time.Time) error { - if time.Now().After(processAt) { +// Enqueue enqueues task to be processed immediately. +// +// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. +// +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +func (c *Client) Enqueue(task *Task, opts ...Option) error { + return c.EnqueueAt(time.Now(), task, opts...) +} + +// EnqueueIn schedules task to be enqueued after the specified delay. +// +// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error. +// +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error { + return c.EnqueueAt(time.Now().Add(d), task, opts...) +} + +func (c *Client) enqueue(msg *base.TaskMessage, t time.Time) error { + if time.Now().After(t) { return c.rdb.Enqueue(msg) } - return c.rdb.Schedule(msg, processAt) + return c.rdb.Schedule(msg, t) } diff --git a/client_test.go b/client_test.go index f0b8710..35f4869 100644 --- a/client_test.go +++ b/client_test.go @@ -13,15 +13,20 @@ import ( "github.com/hibiken/asynq/internal/base" ) -func TestClient(t *testing.T) { +func TestClientEnqueueAt(t *testing.T) { r := setup(t) client := NewClient(RedisClientOpt{ - Addr: "localhost:6379", - DB: 14, + Addr: redisAddr, + DB: redisDB, }) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + var ( + now = time.Now() + oneHourLater = now.Add(time.Hour) + ) + tests := []struct { desc string task *Task @@ -33,7 +38,7 @@ func TestClient(t *testing.T) { { desc: "Process task immediately", task: task, - processAt: time.Now(), + processAt: now, opts: []Option{}, wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ @@ -51,7 +56,7 @@ func TestClient(t *testing.T) { { desc: "Schedule task to be processed in the future", task: task, - processAt: time.Now().Add(2 * time.Hour), + processAt: oneHourLater, opts: []Option{}, wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantScheduled: []h.ZSetEntry{ @@ -63,137 +68,239 @@ func TestClient(t *testing.T) { Queue: "default", Timeout: time.Duration(0).String(), }, - Score: float64(time.Now().Add(2 * time.Hour).Unix()), + Score: float64(oneHourLater.Unix()), }, }, }, - { - desc: "Process task immediately with a custom retry count", - task: task, - processAt: time.Now(), - opts: []Option{ - MaxRetry(3), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 3, - Queue: "default", - Timeout: time.Duration(0).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, - { - desc: "Negative retry count", - task: task, - processAt: time.Now(), - opts: []Option{ - MaxRetry(-2), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 0, // Retry count should be set to zero - Queue: "default", - Timeout: time.Duration(0).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, - { - desc: "Conflicting options", - task: task, - processAt: time.Now(), - opts: []Option{ - MaxRetry(2), - MaxRetry(10), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 10, // Last option takes precedence - Queue: "default", - Timeout: time.Duration(0).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, - { - desc: "With queue option", - task: task, - processAt: time.Now(), - opts: []Option{ - Queue("custom"), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "custom": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "custom", - Timeout: time.Duration(0).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, - { - desc: "Queue option should be case-insensitive", - task: task, - processAt: time.Now(), - opts: []Option{ - Queue("HIGH"), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "high": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "high", - Timeout: time.Duration(0).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, - { - desc: "Timeout option sets the timeout duration", - task: task, - processAt: time.Now(), - opts: []Option{ - Timeout(20 * time.Second), - }, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: (20 * time.Second).String(), - }, - }, - }, - wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil - }, } for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - err := client.Schedule(tc.task, tc.processAt, tc.opts...) + err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...) + if err != nil { + t.Error(err) + continue + } + + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r, qname) + if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + } + } + + gotScheduled := h.GetScheduledEntries(t, r) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff) + } + } +} + +func TestClientEnqueue(t *testing.T) { + r := setup(t) + client := NewClient(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + + tests := []struct { + desc string + task *Task + opts []Option + wantEnqueued map[string][]*base.TaskMessage + }{ + { + desc: "Process task immediately with a custom retry count", + task: task, + opts: []Option{ + MaxRetry(3), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 3, + Queue: "default", + Timeout: time.Duration(0).String(), + }, + }, + }, + }, + { + desc: "Negative retry count", + task: task, + opts: []Option{ + MaxRetry(-2), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 0, // Retry count should be set to zero + Queue: "default", + Timeout: time.Duration(0).String(), + }, + }, + }, + }, + { + desc: "Conflicting options", + task: task, + opts: []Option{ + MaxRetry(2), + MaxRetry(10), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 10, // Last option takes precedence + Queue: "default", + Timeout: time.Duration(0).String(), + }, + }, + }, + }, + { + desc: "With queue option", + task: task, + opts: []Option{ + Queue("custom"), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "custom": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "custom", + Timeout: time.Duration(0).String(), + }, + }, + }, + }, + { + desc: "Queue option should be case-insensitive", + task: task, + opts: []Option{ + Queue("HIGH"), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "high": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "high", + Timeout: time.Duration(0).String(), + }, + }, + }, + }, + { + desc: "Timeout option sets the timeout duration", + task: task, + opts: []Option{ + Timeout(20 * time.Second), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: (20 * time.Second).String(), + }, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + err := client.Enqueue(tc.task, tc.opts...) + if err != nil { + t.Error(err) + continue + } + + for qname, want := range tc.wantEnqueued { + got := h.GetEnqueuedMessages(t, r, qname) + if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + } + } + } +} + +func TestClientEnqueueIn(t *testing.T) { + r := setup(t) + client := NewClient(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + + tests := []struct { + desc string + task *Task + delay time.Duration + opts []Option + wantEnqueued map[string][]*base.TaskMessage + wantScheduled []h.ZSetEntry + }{ + { + desc: "schedule a task to be enqueued in one hour", + task: task, + delay: time.Hour, + opts: []Option{}, + wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil + wantScheduled: []h.ZSetEntry{ + { + Msg: &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: time.Duration(0).String(), + }, + Score: float64(time.Now().Add(time.Hour).Unix()), + }, + }, + }, + { + desc: "Zero delay", + task: task, + delay: 0, + opts: []Option{}, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: time.Duration(0).String(), + }, + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + err := client.EnqueueIn(tc.delay, tc.task, tc.opts...) if err != nil { t.Error(err) continue diff --git a/doc.go b/doc.go index a369fc1..dfced06 100644 --- a/doc.go +++ b/doc.go @@ -9,8 +9,8 @@ Asynq uses Redis as a message broker. To connect to redis server, specify the options using one of RedisConnOpt types. redis = &asynq.RedisClientOpt{ - Addr: "localhost:6379", - Password: "secretpassword", + Addr: "127.0.0.1:6379", + Password: "xxxxx", DB: 3, } @@ -24,8 +24,11 @@ Task is created with two parameters: its type and payload. "send_email", map[string]interface{}{"user_id": 42}) - // Schedule the task t to be processed a minute from now. - err := client.Schedule(t, time.Now().Add(time.Minute)) + // Enqueue the task to be processed immediately. + err := client.Enqueue(t) + + // Schedule the task to be processed in one minute. + err = client.EnqueueIn(time.Minute, t) The Background is used to run the background task processing with a given handler.