mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-15 20:08:46 +08:00
Return Result struct to caller of Enqueue
This commit is contained in:
parent
e27ae0d33a
commit
6705f7c27a
@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
- Tasks that exceed its deadline are automatically retried.
|
- Tasks that exceed its deadline are automatically retried.
|
||||||
- Encoding schema for task message has changed. Please install the lastest CLI and run `migrate` command if
|
- Encoding schema for task message has changed. Please install the lastest CLI and run `migrate` command if
|
||||||
you have tasks enqueued by the previous version of asynq.
|
you have tasks enqueued by the previous version of asynq.
|
||||||
|
- API of `(*Client).Enqueue`, `(*Client).EnqueueIn`, and `(*Client).EnqueueAt` has changed to return a `*Result`.
|
||||||
|
|
||||||
## [0.9.4] - 2020-06-13
|
## [0.9.4] - 2020-06-13
|
||||||
|
|
||||||
|
12
README.md
12
README.md
@ -157,10 +157,11 @@ func main() {
|
|||||||
// ------------------------------------------------------
|
// ------------------------------------------------------
|
||||||
|
|
||||||
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
|
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
|
||||||
err := c.Enqueue(t)
|
res, err := c.Enqueue(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("could not enqueue task: %v", err)
|
log.Fatal("could not enqueue task: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("Enqueued Result: %+v\n", res)
|
||||||
|
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
@ -169,10 +170,11 @@ func main() {
|
|||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
|
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
|
||||||
err = c.EnqueueIn(24*time.Hour, t)
|
res, err = c.EnqueueIn(24*time.Hour, t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("could not schedule task: %v", err)
|
log.Fatal("could not schedule task: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("Enqueued Result: %+v\n", res)
|
||||||
|
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
@ -183,10 +185,11 @@ func main() {
|
|||||||
c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute))
|
c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute))
|
||||||
|
|
||||||
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
||||||
err = c.Enqueue(t)
|
res, err = c.Enqueue(t)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("could not enqueue task: %v", err)
|
log.Fatal("could not enqueue task: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("Enqueued Result: %+v\n", res)
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Example 4: Pass options to tune task processing behavior at enqueue time.
|
// Example 4: Pass options to tune task processing behavior at enqueue time.
|
||||||
@ -194,10 +197,11 @@ func main() {
|
|||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
||||||
err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
|
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("could not enqueue task: %v", err)
|
log.Fatal("could not enqueue task: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("Enqueued Result: %+v\n", res)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
|
|||||||
// Create a bunch of tasks
|
// Create a bunch of tasks
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t); err != nil {
|
if _, err := client.Enqueue(t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -76,13 +76,13 @@ func BenchmarkEndToEnd(b *testing.B) {
|
|||||||
// Create a bunch of tasks
|
// Create a bunch of tasks
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t); err != nil {
|
if _, err := client.Enqueue(t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil {
|
if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,19 +144,19 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
|
|||||||
// Create a bunch of tasks
|
// Create a bunch of tasks
|
||||||
for i := 0; i < highCount; i++ {
|
for i := 0; i < highCount; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t, Queue("high")); err != nil {
|
if _, err := client.Enqueue(t, Queue("high")); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < defaultCount; i++ {
|
for i := 0; i < defaultCount; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t); err != nil {
|
if _, err := client.Enqueue(t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := 0; i < lowCount; i++ {
|
for i := 0; i < lowCount; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t, Queue("low")); err != nil {
|
if _, err := client.Enqueue(t, Queue("low")); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -200,14 +200,14 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
|
|||||||
// Enqueue 10,000 tasks.
|
// Enqueue 10,000 tasks.
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.Enqueue(t); err != nil {
|
if _, err := client.Enqueue(t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Schedule 10,000 tasks.
|
// Schedule 10,000 tasks.
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
|
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i})
|
||||||
if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil {
|
if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil {
|
||||||
b.Fatalf("could not enqueue a task: %v", err)
|
b.Fatalf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,7 +223,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
|
|||||||
enqueued := 0
|
enqueued := 0
|
||||||
for enqueued < 100000 {
|
for enqueued < 100000 {
|
||||||
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued})
|
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued})
|
||||||
if err := client.Enqueue(t); err != nil {
|
if _, err := client.Enqueue(t); err != nil {
|
||||||
b.Logf("could not enqueue task %d: %v", enqueued, err)
|
b.Logf("could not enqueue task %d: %v", enqueued, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
52
client.go
52
client.go
@ -200,6 +200,35 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
|
|||||||
c.opts[taskType] = opts
|
c.opts[taskType] = opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A Result holds enqueued task's metadata.
|
||||||
|
type Result struct {
|
||||||
|
// ID is a unique identifier for the task.
|
||||||
|
ID string
|
||||||
|
|
||||||
|
// Retry is the maximum number of retry for the task.
|
||||||
|
Retry int
|
||||||
|
|
||||||
|
// Queue is a name of the queue the task is enqueued to.
|
||||||
|
Queue string
|
||||||
|
|
||||||
|
// Timeout is the timeout value for the task.
|
||||||
|
// Counting for timeout starts when a worker starts processing the task.
|
||||||
|
// If task processing doesn't complete within the timeout, the task will be retried.
|
||||||
|
// The value zero means no timeout.
|
||||||
|
//
|
||||||
|
// If deadline is set, min(now+timeout, deadline) is used, where the now is the time when
|
||||||
|
// a worker starts processing the task.
|
||||||
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// Deadline is the deadline value for the task.
|
||||||
|
// If task processing doesn't complete before the deadline, the task will be retried.
|
||||||
|
// The value time.Unix(0, 0) means no deadline.
|
||||||
|
//
|
||||||
|
// If timeout is set, min(now+timeout, deadline) is used, where the now is the time when
|
||||||
|
// a worker starts processing the task.
|
||||||
|
Deadline time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// EnqueueAt schedules task to be enqueued at the specified time.
|
// EnqueueAt schedules task to be enqueued at the specified time.
|
||||||
//
|
//
|
||||||
// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
|
// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
|
||||||
@ -207,7 +236,7 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
|
|||||||
// The argument opts specifies the behavior of task processing.
|
// The argument opts specifies the behavior of task processing.
|
||||||
// If there are conflicting Option values the last one overrides others.
|
// If there are conflicting Option values the last one overrides others.
|
||||||
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
||||||
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
|
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) {
|
||||||
return c.enqueueAt(t, task, opts...)
|
return c.enqueueAt(t, task, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +247,7 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
|
|||||||
// The argument opts specifies the behavior of task processing.
|
// The argument opts specifies the behavior of task processing.
|
||||||
// If there are conflicting Option values the last one overrides others.
|
// If there are conflicting Option values the last one overrides others.
|
||||||
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
||||||
func (c *Client) Enqueue(task *Task, opts ...Option) error {
|
func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
|
||||||
return c.enqueueAt(time.Now(), task, opts...)
|
return c.enqueueAt(time.Now(), task, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +258,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) error {
|
|||||||
// The argument opts specifies the behavior of task processing.
|
// The argument opts specifies the behavior of task processing.
|
||||||
// If there are conflicting Option values the last one overrides others.
|
// If there are conflicting Option values the last one overrides others.
|
||||||
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
||||||
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
|
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) (*Result, error) {
|
||||||
return c.enqueueAt(time.Now().Add(d), task, opts...)
|
return c.enqueueAt(time.Now().Add(d), task, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -238,7 +267,7 @@ func (c *Client) Close() error {
|
|||||||
return c.rdb.Close()
|
return c.rdb.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
|
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
if defaults, ok := c.opts[task.Type]; ok {
|
if defaults, ok := c.opts[task.Type]; ok {
|
||||||
@ -274,10 +303,19 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
|
|||||||
} else {
|
} else {
|
||||||
err = c.schedule(msg, t, opt.uniqueTTL)
|
err = c.schedule(msg, t, opt.uniqueTTL)
|
||||||
}
|
}
|
||||||
if err == rdb.ErrDuplicateTask {
|
switch {
|
||||||
return fmt.Errorf("%w", ErrDuplicateTask)
|
case err == rdb.ErrDuplicateTask:
|
||||||
|
return nil, fmt.Errorf("%w", ErrDuplicateTask)
|
||||||
|
case err != nil:
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return err
|
return &Result{
|
||||||
|
ID: msg.ID.String(),
|
||||||
|
Queue: msg.Queue,
|
||||||
|
Retry: msg.Retry,
|
||||||
|
Timeout: timeout,
|
||||||
|
Deadline: deadline,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
||||||
|
152
client_test.go
152
client_test.go
@ -34,6 +34,7 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
task *Task
|
task *Task
|
||||||
processAt time.Time
|
processAt time.Time
|
||||||
opts []Option
|
opts []Option
|
||||||
|
wantRes *Result
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantScheduled []h.ZSetEntry
|
wantScheduled []h.ZSetEntry
|
||||||
}{
|
}{
|
||||||
@ -42,6 +43,12 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
task: task,
|
task: task,
|
||||||
processAt: now,
|
processAt: now,
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -57,10 +64,16 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
|
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "Schedule task to be processed in the future",
|
desc: "Schedule task to be processed in the future",
|
||||||
task: task,
|
task: task,
|
||||||
processAt: oneHourLater,
|
processAt: oneHourLater,
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
||||||
wantScheduled: []h.ZSetEntry{
|
wantScheduled: []h.ZSetEntry{
|
||||||
{
|
{
|
||||||
@ -81,11 +94,15 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...)
|
gotRes, err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" {
|
||||||
|
t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s",
|
||||||
|
tc.desc, gotRes, tc.wantRes, diff)
|
||||||
|
}
|
||||||
|
|
||||||
for qname, want := range tc.wantEnqueued {
|
for qname, want := range tc.wantEnqueued {
|
||||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||||
@ -114,6 +131,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
desc string
|
desc string
|
||||||
task *Task
|
task *Task
|
||||||
opts []Option
|
opts []Option
|
||||||
|
wantRes *Result
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@ -122,6 +140,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
MaxRetry(3),
|
MaxRetry(3),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: 3,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -141,6 +165,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
MaxRetry(-2),
|
MaxRetry(-2),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: 0,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -161,6 +191,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
MaxRetry(2),
|
MaxRetry(2),
|
||||||
MaxRetry(10),
|
MaxRetry(10),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: 10,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -180,6 +216,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
Queue("custom"),
|
Queue("custom"),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "custom",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"custom": {
|
"custom": {
|
||||||
{
|
{
|
||||||
@ -199,6 +241,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
Queue("HIGH"),
|
Queue("HIGH"),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "high",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"high": {
|
"high": {
|
||||||
{
|
{
|
||||||
@ -218,6 +266,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
Timeout(20 * time.Second),
|
Timeout(20 * time.Second),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: 20 * time.Second,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -237,6 +291,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
opts: []Option{
|
opts: []Option{
|
||||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: noTimeout,
|
||||||
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -257,6 +317,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Timeout(20 * time.Second),
|
Timeout(20 * time.Second),
|
||||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
||||||
},
|
},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: 20 * time.Second,
|
||||||
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -275,11 +341,15 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
err := client.Enqueue(tc.task, tc.opts...)
|
gotRes, err := client.Enqueue(tc.task, tc.opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" {
|
||||||
|
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
|
||||||
|
tc.desc, gotRes, tc.wantRes, diff)
|
||||||
|
}
|
||||||
|
|
||||||
for qname, want := range tc.wantEnqueued {
|
for qname, want := range tc.wantEnqueued {
|
||||||
got := h.GetEnqueuedMessages(t, r, qname)
|
got := h.GetEnqueuedMessages(t, r, qname)
|
||||||
@ -304,14 +374,21 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
task *Task
|
task *Task
|
||||||
delay time.Duration
|
delay time.Duration
|
||||||
opts []Option
|
opts []Option
|
||||||
|
wantRes *Result
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantScheduled []h.ZSetEntry
|
wantScheduled []h.ZSetEntry
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "schedule a task to be enqueued in one hour",
|
desc: "schedule a task to be enqueued in one hour",
|
||||||
task: task,
|
task: task,
|
||||||
delay: time.Hour,
|
delay: time.Hour,
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
||||||
wantScheduled: []h.ZSetEntry{
|
wantScheduled: []h.ZSetEntry{
|
||||||
{
|
{
|
||||||
@ -332,6 +409,12 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
task: task,
|
task: task,
|
||||||
delay: 0,
|
delay: 0,
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
|
wantRes: &Result{
|
||||||
|
Queue: "default",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {
|
"default": {
|
||||||
{
|
{
|
||||||
@ -351,11 +434,15 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
err := client.EnqueueIn(tc.delay, tc.task, tc.opts...)
|
gotRes, err := client.EnqueueIn(tc.delay, tc.task, tc.opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" {
|
||||||
|
t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s",
|
||||||
|
tc.desc, gotRes, tc.wantRes, diff)
|
||||||
|
}
|
||||||
|
|
||||||
for qname, want := range tc.wantEnqueued {
|
for qname, want := range tc.wantEnqueued {
|
||||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||||
@ -379,6 +466,7 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
defaultOpts []Option // options set at the client level.
|
defaultOpts []Option // options set at the client level.
|
||||||
opts []Option // options used at enqueue time.
|
opts []Option // options used at enqueue time.
|
||||||
task *Task
|
task *Task
|
||||||
|
wantRes *Result
|
||||||
queue string // queue that the message should go into.
|
queue string // queue that the message should go into.
|
||||||
want *base.TaskMessage
|
want *base.TaskMessage
|
||||||
}{
|
}{
|
||||||
@ -387,7 +475,13 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
defaultOpts: []Option{Queue("feed")},
|
defaultOpts: []Option{Queue("feed")},
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
task: NewTask("feed:import", nil),
|
task: NewTask("feed:import", nil),
|
||||||
queue: "feed",
|
wantRes: &Result{
|
||||||
|
Queue: "feed",
|
||||||
|
Retry: defaultMaxRetry,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
|
queue: "feed",
|
||||||
want: &base.TaskMessage{
|
want: &base.TaskMessage{
|
||||||
Type: "feed:import",
|
Type: "feed:import",
|
||||||
Payload: nil,
|
Payload: nil,
|
||||||
@ -402,7 +496,13 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
||||||
opts: []Option{},
|
opts: []Option{},
|
||||||
task: NewTask("feed:import", nil),
|
task: NewTask("feed:import", nil),
|
||||||
queue: "feed",
|
wantRes: &Result{
|
||||||
|
Queue: "feed",
|
||||||
|
Retry: 5,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
|
queue: "feed",
|
||||||
want: &base.TaskMessage{
|
want: &base.TaskMessage{
|
||||||
Type: "feed:import",
|
Type: "feed:import",
|
||||||
Payload: nil,
|
Payload: nil,
|
||||||
@ -417,7 +517,13 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
||||||
opts: []Option{Queue("critical")},
|
opts: []Option{Queue("critical")},
|
||||||
task: NewTask("feed:import", nil),
|
task: NewTask("feed:import", nil),
|
||||||
queue: "critical",
|
wantRes: &Result{
|
||||||
|
Queue: "critical",
|
||||||
|
Retry: 5,
|
||||||
|
Timeout: defaultTimeout,
|
||||||
|
Deadline: noDeadline,
|
||||||
|
},
|
||||||
|
queue: "critical",
|
||||||
want: &base.TaskMessage{
|
want: &base.TaskMessage{
|
||||||
Type: "feed:import",
|
Type: "feed:import",
|
||||||
Payload: nil,
|
Payload: nil,
|
||||||
@ -433,10 +539,14 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
|
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
|
||||||
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
|
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
|
||||||
err := c.Enqueue(tc.task, tc.opts...)
|
gotRes, err := c.Enqueue(tc.task, tc.opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" {
|
||||||
|
t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s",
|
||||||
|
tc.desc, gotRes, tc.wantRes, diff)
|
||||||
|
}
|
||||||
enqueued := h.GetEnqueuedMessages(t, r, tc.queue)
|
enqueued := h.GetEnqueuedMessages(t, r, tc.queue)
|
||||||
if len(enqueued) != 1 {
|
if len(enqueued) != 1 {
|
||||||
t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.",
|
t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.",
|
||||||
@ -538,7 +648,7 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
// Enqueue the task first. It should succeed.
|
// Enqueue the task first. It should succeed.
|
||||||
err := c.Enqueue(tc.task, Unique(tc.ttl))
|
_, err := c.Enqueue(tc.task, Unique(tc.ttl))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -550,7 +660,7 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue the task again. It should fail.
|
// Enqueue the task again. It should fail.
|
||||||
err = c.Enqueue(tc.task, Unique(tc.ttl))
|
_, err = c.Enqueue(tc.task, Unique(tc.ttl))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
||||||
continue
|
continue
|
||||||
@ -585,7 +695,7 @@ func TestEnqueueInUnique(t *testing.T) {
|
|||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
// Enqueue the task first. It should succeed.
|
// Enqueue the task first. It should succeed.
|
||||||
err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
|
_, err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -598,7 +708,7 @@ func TestEnqueueInUnique(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue the task again. It should fail.
|
// Enqueue the task again. It should fail.
|
||||||
err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
|
_, err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
||||||
continue
|
continue
|
||||||
@ -633,7 +743,7 @@ func TestEnqueueAtUnique(t *testing.T) {
|
|||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
// Enqueue the task first. It should succeed.
|
// Enqueue the task first. It should succeed.
|
||||||
err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
|
_, err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -646,7 +756,7 @@ func TestEnqueueAtUnique(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue the task again. It should fail.
|
// Enqueue the task again. It should fail.
|
||||||
err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
|
_, err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
||||||
continue
|
continue
|
||||||
|
4
doc.go
4
doc.go
@ -25,10 +25,10 @@ Task is created with two parameters: its type and payload.
|
|||||||
map[string]interface{}{"user_id": 42})
|
map[string]interface{}{"user_id": 42})
|
||||||
|
|
||||||
// Enqueue the task to be processed immediately.
|
// Enqueue the task to be processed immediately.
|
||||||
err := client.Enqueue(t)
|
res, err := client.Enqueue(t)
|
||||||
|
|
||||||
// Schedule the task to be processed after one minute.
|
// Schedule the task to be processed after one minute.
|
||||||
err = client.EnqueueIn(time.Minute, t)
|
res, err = client.EnqueueIn(time.Minute, t)
|
||||||
|
|
||||||
The Server is used to run the background task processing with a given
|
The Server is used to run the background task processing with a given
|
||||||
handler.
|
handler.
|
||||||
|
@ -41,12 +41,12 @@ func TestServer(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
|
_, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("could not enqueue a task: %v", err)
|
t.Errorf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
|
_, err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("could not enqueue a task: %v", err)
|
t.Errorf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i))
|
_, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
err = c.Enqueue(NewTask("bad_task", nil))
|
_, err = c.Enqueue(NewTask("bad_task", nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil))
|
_, err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user