2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Change Client APIs

Use `Enqueue`, `EnqueueAt`, and `EnqueueIn` to enqueue and schedule
tasks.
`Schedule` method was removed.
This commit is contained in:
Ken Hibino 2020-02-23 15:40:04 -08:00
parent 5b53a2aee9
commit 49c117f4d1
7 changed files with 315 additions and 162 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- `Client` API has changed. Use `Enqueue`, `EnqueueAt` and `EnqueueIn` to enqueue and schedule tasks.
### Added ### Added
- `asynqmon workers` was added to list all running workers information - `asynqmon workers` was added to list all running workers information

View File

@ -35,7 +35,7 @@ To create and schedule tasks, use `Client` and provide a task and when to proces
```go ```go
func main() { func main() {
r := &asynq.RedisClientOpt{ r := &asynq.RedisClientOpt{
Addr: "localhost:6379", Addr: "127.0.0.1:6379",
} }
client := asynq.NewClient(r) client := asynq.NewClient(r)
@ -46,19 +46,21 @@ func main() {
t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42}) t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42})
// Process immediately // Process immediately
err := client.Schedule(t1, time.Now()) err := client.Enqueue(t1)
// Process 24 hrs later // Process 24 hrs later
err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) err = client.EnqueueIn(24*time.Hour, t2)
// If processing fails, retry up to 10 times (Default is 25) // Process at specified time.
err = client.Schedule(t1, time.Now(), asynq.Retry(10)) t := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
err = client.EnqueueAt(t, t2)
// Use custom queue called "critical" // Pass options to specify processing behavior for a given task.
err = client.Schedule(t1, time.Now(), asynq.Queue("critical")) //
// MaxRetry specifies the maximum number of times this task will be retried (Default is 25).
// Use timeout to specify how long a task may run (Default is no limit) // Queue specifies which queue to enqueue this task to (Default is "default").
err = client.Schedule(t1, time.Now(), asynq.Timeout(30 * time.Second)) // Timeout specifies the the timeout for the task's context (Default is no timeout).
err = client.Enqueue(t1, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
} }
``` ```
@ -67,13 +69,13 @@ To start the background workers, use `Background` and provide your `Handler` to
```go ```go
func main() { func main() {
r := &asynq.RedisClientOpt{ r := &asynq.RedisClientOpt{
Addr: "localhost:6379", Addr: "127.0.0.1:6379",
} }
bg := asynq.NewBackground(r, &asynq.Config{ bg := asynq.NewBackground(r, &asynq.Config{
// Specify how many concurrent workers to use // Specify how many concurrent workers to use
Concurrency: 10, Concurrency: 10,
// You can optionally create multiple queues with different priority. // Optionally specify multiple queues with different priority.
Queues: map[string]int{ Queues: map[string]int{
"critical": 6, "critical": 6,
"default": 3, "default": 3,

View File

@ -34,9 +34,15 @@ func TestBackground(t *testing.T) {
bg.start(HandlerFunc(h)) bg.start(HandlerFunc(h))
client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 123}), time.Now()) err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}
client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), time.Now().Add(time.Hour)) err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}
bg.stop() bg.stop()
} }

View File

@ -33,7 +33,9 @@ func BenchmarkEndToEndSimple(b *testing.B) {
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now()) if err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -74,11 +76,15 @@ func BenchmarkEndToEnd(b *testing.B) {
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now()) if err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now().Add(time.Second)) if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -129,15 +135,21 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < highCount; i++ { for i := 0; i < highCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now(), Queue("high")) if err := client.Enqueue(t, Queue("high")); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
for i := 0; i < defaultCount; i++ { for i := 0; i < defaultCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now()) if err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
for i := 0; i < lowCount; i++ { for i := 0; i < lowCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
client.Schedule(t, time.Now(), Queue("low")) if err := client.Enqueue(t, Queue("low")); err != nil {
b.Fatalf("could not enqueue a task: %v", err)
}
} }
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -96,14 +96,13 @@ const (
defaultMaxRetry = 25 defaultMaxRetry = 25
) )
// Schedule registers a task to be processed at the specified time. // EnqueueAt schedules task to be enqueued at the specified time.
// //
// Schedule returns nil if the task is registered successfully, // EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
// otherwise returns a non-nil error.
// //
// opts specifies the behavior of task processing. If there are conflicting // The argument opts specifies the behavior of task processing.
// Option values the last one overrides others. // If there are conflicting Option values the last one overrides others.
func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error { func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
opt := composeOptions(opts...) opt := composeOptions(opts...)
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
@ -113,12 +112,32 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error
Retry: opt.retry, Retry: opt.retry,
Timeout: opt.timeout.String(), Timeout: opt.timeout.String(),
} }
return c.enqueue(msg, processAt) return c.enqueue(msg, t)
} }
func (c *Client) enqueue(msg *base.TaskMessage, processAt time.Time) error { // Enqueue enqueues task to be processed immediately.
if time.Now().After(processAt) { //
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now(), task, opts...)
}
// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now().Add(d), task, opts...)
}
func (c *Client) enqueue(msg *base.TaskMessage, t time.Time) error {
if time.Now().After(t) {
return c.rdb.Enqueue(msg) return c.rdb.Enqueue(msg)
} }
return c.rdb.Schedule(msg, processAt) return c.rdb.Schedule(msg, t)
} }

View File

@ -13,15 +13,20 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
func TestClient(t *testing.T) { func TestClientEnqueueAt(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(RedisClientOpt{ client := NewClient(RedisClientOpt{
Addr: "localhost:6379", Addr: redisAddr,
DB: 14, DB: redisDB,
}) })
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
now = time.Now()
oneHourLater = now.Add(time.Hour)
)
tests := []struct { tests := []struct {
desc string desc string
task *Task task *Task
@ -33,7 +38,7 @@ func TestClient(t *testing.T) {
{ {
desc: "Process task immediately", desc: "Process task immediately",
task: task, task: task,
processAt: time.Now(), processAt: now,
opts: []Option{}, opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{ "default": []*base.TaskMessage{
@ -51,7 +56,7 @@ func TestClient(t *testing.T) {
{ {
desc: "Schedule task to be processed in the future", desc: "Schedule task to be processed in the future",
task: task, task: task,
processAt: time.Now().Add(2 * time.Hour), processAt: oneHourLater,
opts: []Option{}, opts: []Option{},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{ wantScheduled: []h.ZSetEntry{
@ -63,137 +68,239 @@ func TestClient(t *testing.T) {
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: time.Duration(0).String(),
}, },
Score: float64(time.Now().Add(2 * time.Hour).Unix()), Score: float64(oneHourLater.Unix()),
}, },
}, },
}, },
{
desc: "Process task immediately with a custom retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(3),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Negative retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Conflicting options",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "With queue option",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Queue option should be case-insensitive",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Timeout option sets the timeout duration",
task: task,
processAt: time.Now(),
opts: []Option{
Timeout(20 * time.Second),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case. h.FlushDB(t, r) // clean up db before each test case.
err := client.Schedule(tc.task, tc.processAt, tc.opts...) err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...)
if err != nil {
t.Error(err)
continue
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledEntries(t, r)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff)
}
}
}
func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
tests := []struct {
desc string
task *Task
opts []Option
wantEnqueued map[string][]*base.TaskMessage
}{
{
desc: "Process task immediately with a custom retry count",
task: task,
opts: []Option{
MaxRetry(3),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Negative retry count",
task: task,
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Conflicting options",
task: task,
opts: []Option{
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "With queue option",
task: task,
opts: []Option{
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Queue option should be case-insensitive",
task: task,
opts: []Option{
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Timeout option sets the timeout duration",
task: task,
opts: []Option{
Timeout(20 * time.Second),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Error(err)
continue
}
for qname, want := range tc.wantEnqueued {
got := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
}
}
func TestClientEnqueueIn(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
tests := []struct {
desc string
task *Task
delay time.Duration
opts []Option
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry
}{
{
desc: "schedule a task to be enqueued in one hour",
task: task,
delay: time.Hour,
opts: []Option{},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
Score: float64(time.Now().Add(time.Hour).Unix()),
},
},
},
{
desc: "Zero delay",
task: task,
delay: 0,
opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
err := client.EnqueueIn(tc.delay, tc.task, tc.opts...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue

11
doc.go
View File

@ -9,8 +9,8 @@ Asynq uses Redis as a message broker. To connect to redis server,
specify the options using one of RedisConnOpt types. specify the options using one of RedisConnOpt types.
redis = &asynq.RedisClientOpt{ redis = &asynq.RedisClientOpt{
Addr: "localhost:6379", Addr: "127.0.0.1:6379",
Password: "secretpassword", Password: "xxxxx",
DB: 3, DB: 3,
} }
@ -24,8 +24,11 @@ Task is created with two parameters: its type and payload.
"send_email", "send_email",
map[string]interface{}{"user_id": 42}) map[string]interface{}{"user_id": 42})
// Schedule the task t to be processed a minute from now. // Enqueue the task to be processed immediately.
err := client.Schedule(t, time.Now().Add(time.Minute)) err := client.Enqueue(t)
// Schedule the task to be processed in one minute.
err = client.EnqueueIn(time.Minute, t)
The Background is used to run the background task processing with a given The Background is used to run the background task processing with a given
handler. handler.