2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00
asynq/client_test.go
Ken Hibino 49c117f4d1 Change Client APIs
Use `Enqueue`, `EnqueueAt`, and `EnqueueIn` to enqueue and schedule
tasks.
`Schedule` method was removed.
2020-02-23 20:40:40 -08:00

322 lines
7.8 KiB
Go

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func TestClientEnqueueAt(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
now = time.Now()
oneHourLater = now.Add(time.Hour)
)
tests := []struct {
desc string
task *Task
processAt time.Time
opts []Option
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry
}{
{
desc: "Process task immediately",
task: task,
processAt: now,
opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Schedule task to be processed in the future",
task: task,
processAt: oneHourLater,
opts: []Option{},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
Score: float64(oneHourLater.Unix()),
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...)
if err != nil {
t.Error(err)
continue
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledEntries(t, r)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff)
}
}
}
func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
tests := []struct {
desc string
task *Task
opts []Option
wantEnqueued map[string][]*base.TaskMessage
}{
{
desc: "Process task immediately with a custom retry count",
task: task,
opts: []Option{
MaxRetry(3),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Negative retry count",
task: task,
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Conflicting options",
task: task,
opts: []Option{
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "With queue option",
task: task,
opts: []Option{
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Queue option should be case-insensitive",
task: task,
opts: []Option{
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: time.Duration(0).String(),
},
},
},
},
{
desc: "Timeout option sets the timeout duration",
task: task,
opts: []Option{
Timeout(20 * time.Second),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
err := client.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Error(err)
continue
}
for qname, want := range tc.wantEnqueued {
got := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
}
}
func TestClientEnqueueIn(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
tests := []struct {
desc string
task *Task
delay time.Duration
opts []Option
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry
}{
{
desc: "schedule a task to be enqueued in one hour",
task: task,
delay: time.Hour,
opts: []Option{},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
Score: float64(time.Now().Add(time.Hour).Unix()),
},
},
},
{
desc: "Zero delay",
task: task,
delay: 0,
opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
err := client.EnqueueIn(tc.delay, tc.task, tc.opts...)
if err != nil {
t.Error(err)
continue
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledEntries(t, r)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff)
}
}
}