From 6705f7c27a319f25dee45541a47d372089f5f62c Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 3 Jul 2020 05:49:52 -0700 Subject: [PATCH] Return Result struct to caller of Enqueue --- CHANGELOG.md | 1 + README.md | 12 ++-- benchmark_test.go | 18 +++--- client.go | 52 +++++++++++++--- client_test.go | 152 +++++++++++++++++++++++++++++++++++++++------- doc.go | 4 +- server_test.go | 10 +-- 7 files changed, 201 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3942fb..622d36e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Tasks that exceed its deadline are automatically retried. - Encoding schema for task message has changed. Please install the lastest CLI and run `migrate` command if you have tasks enqueued by the previous version of asynq. +- API of `(*Client).Enqueue`, `(*Client).EnqueueIn`, and `(*Client).EnqueueAt` has changed to return a `*Result`. ## [0.9.4] - 2020-06-13 diff --git a/README.md b/README.md index 44899ee..5ad6620 100644 --- a/README.md +++ b/README.md @@ -157,10 +157,11 @@ func main() { // ------------------------------------------------------ t := tasks.NewEmailDeliveryTask(42, "some:template:id") - err := c.Enqueue(t) + res, err := c.Enqueue(t) if err != nil { log.Fatal("could not enqueue task: %v", err) } + fmt.Printf("Enqueued Result: %+v\n", res) // ------------------------------------------------------------ @@ -169,10 +170,11 @@ func main() { // ------------------------------------------------------------ t = tasks.NewEmailDeliveryTask(42, "other:template:id") - err = c.EnqueueIn(24*time.Hour, t) + res, err = c.EnqueueIn(24*time.Hour, t) if err != nil { log.Fatal("could not schedule task: %v", err) } + fmt.Printf("Enqueued Result: %+v\n", res) // ---------------------------------------------------------------------------- @@ -183,10 +185,11 @@ func main() { c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute)) t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") - err = c.Enqueue(t) + res, err = c.Enqueue(t) if err != nil { log.Fatal("could not enqueue task: %v", err) } + fmt.Printf("Enqueued Result: %+v\n", res) // --------------------------------------------------------------------------- // Example 4: Pass options to tune task processing behavior at enqueue time. @@ -194,10 +197,11 @@ func main() { // --------------------------------------------------------------------------- t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") - err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) + res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) } + fmt.Printf("Enqueued Result: %+v\n", res) } ``` diff --git a/benchmark_test.go b/benchmark_test.go index 59b9e7e..df2d190 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -33,7 +33,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { // Create a bunch of tasks for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -76,13 +76,13 @@ func BenchmarkEndToEnd(b *testing.B) { // Create a bunch of tasks for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { + if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -144,19 +144,19 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { // Create a bunch of tasks for i := 0; i < highCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t, Queue("high")); err != nil { + if _, err := client.Enqueue(t, Queue("high")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < defaultCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < lowCount; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t, Queue("low")); err != nil { + if _, err := client.Enqueue(t, Queue("low")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -200,14 +200,14 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { // Enqueue 10,000 tasks. for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } // Schedule 10,000 tasks. for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { + if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -223,7 +223,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { enqueued := 0 for enqueued < 100000 { t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued}) - if err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue } diff --git a/client.go b/client.go index bfb79af..6ac7a72 100644 --- a/client.go +++ b/client.go @@ -200,6 +200,35 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { c.opts[taskType] = opts } +// A Result holds enqueued task's metadata. +type Result struct { + // ID is a unique identifier for the task. + ID string + + // Retry is the maximum number of retry for the task. + Retry int + + // Queue is a name of the queue the task is enqueued to. + Queue string + + // Timeout is the timeout value for the task. + // Counting for timeout starts when a worker starts processing the task. + // If task processing doesn't complete within the timeout, the task will be retried. + // The value zero means no timeout. + // + // If deadline is set, min(now+timeout, deadline) is used, where the now is the time when + // a worker starts processing the task. + Timeout time.Duration + + // Deadline is the deadline value for the task. + // If task processing doesn't complete before the deadline, the task will be retried. + // The value time.Unix(0, 0) means no deadline. + // + // If timeout is set, min(now+timeout, deadline) is used, where the now is the time when + // a worker starts processing the task. + Deadline time.Time +} + // EnqueueAt schedules task to be enqueued at the specified time. // // EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error. @@ -207,7 +236,7 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. -func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { +func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) { return c.enqueueAt(t, task, opts...) } @@ -218,7 +247,7 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. -func (c *Client) Enqueue(task *Task, opts ...Option) error { +func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { return c.enqueueAt(time.Now(), task, opts...) } @@ -229,7 +258,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) error { // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. -func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error { +func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) (*Result, error) { return c.enqueueAt(time.Now().Add(d), task, opts...) } @@ -238,7 +267,7 @@ func (c *Client) Close() error { return c.rdb.Close() } -func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error { +func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) { c.mu.Lock() defer c.mu.Unlock() if defaults, ok := c.opts[task.Type]; ok { @@ -274,10 +303,19 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error { } else { err = c.schedule(msg, t, opt.uniqueTTL) } - if err == rdb.ErrDuplicateTask { - return fmt.Errorf("%w", ErrDuplicateTask) + switch { + case err == rdb.ErrDuplicateTask: + return nil, fmt.Errorf("%w", ErrDuplicateTask) + case err != nil: + return nil, err } - return err + return &Result{ + ID: msg.ID.String(), + Queue: msg.Queue, + Retry: msg.Retry, + Timeout: timeout, + Deadline: deadline, + }, nil } func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { diff --git a/client_test.go b/client_test.go index 4fd3d65..c461dff 100644 --- a/client_test.go +++ b/client_test.go @@ -34,6 +34,7 @@ func TestClientEnqueueAt(t *testing.T) { task *Task processAt time.Time opts []Option + wantRes *Result wantEnqueued map[string][]*base.TaskMessage wantScheduled []h.ZSetEntry }{ @@ -42,6 +43,12 @@ func TestClientEnqueueAt(t *testing.T) { task: task, processAt: now, opts: []Option{}, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -57,10 +64,16 @@ func TestClientEnqueueAt(t *testing.T) { wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil }, { - desc: "Schedule task to be processed in the future", - task: task, - processAt: oneHourLater, - opts: []Option{}, + desc: "Schedule task to be processed in the future", + task: task, + processAt: oneHourLater, + opts: []Option{}, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantScheduled: []h.ZSetEntry{ { @@ -81,11 +94,15 @@ func TestClientEnqueueAt(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...) + gotRes, err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...) if err != nil { t.Error(err) continue } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotRes, tc.wantRes, diff) + } for qname, want := range tc.wantEnqueued { gotEnqueued := h.GetEnqueuedMessages(t, r, qname) @@ -114,6 +131,7 @@ func TestClientEnqueue(t *testing.T) { desc string task *Task opts []Option + wantRes *Result wantEnqueued map[string][]*base.TaskMessage }{ { @@ -122,6 +140,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(3), }, + wantRes: &Result{ + Queue: "default", + Retry: 3, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -141,6 +165,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(-2), }, + wantRes: &Result{ + Queue: "default", + Retry: 0, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -161,6 +191,12 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(2), MaxRetry(10), }, + wantRes: &Result{ + Queue: "default", + Retry: 10, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -180,6 +216,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("custom"), }, + wantRes: &Result{ + Queue: "custom", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "custom": { { @@ -199,6 +241,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("HIGH"), }, + wantRes: &Result{ + Queue: "high", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "high": { { @@ -218,6 +266,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Timeout(20 * time.Second), }, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: 20 * time.Second, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -237,6 +291,12 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: noTimeout, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -257,6 +317,12 @@ func TestClientEnqueue(t *testing.T) { Timeout(20 * time.Second), Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: 20 * time.Second, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -275,11 +341,15 @@ func TestClientEnqueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - err := client.Enqueue(tc.task, tc.opts...) + gotRes, err := client.Enqueue(tc.task, tc.opts...) if err != nil { t.Error(err) continue } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotRes, tc.wantRes, diff) + } for qname, want := range tc.wantEnqueued { got := h.GetEnqueuedMessages(t, r, qname) @@ -304,14 +374,21 @@ func TestClientEnqueueIn(t *testing.T) { task *Task delay time.Duration opts []Option + wantRes *Result wantEnqueued map[string][]*base.TaskMessage wantScheduled []h.ZSetEntry }{ { - desc: "schedule a task to be enqueued in one hour", - task: task, - delay: time.Hour, - opts: []Option{}, + desc: "schedule a task to be enqueued in one hour", + task: task, + delay: time.Hour, + opts: []Option{}, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantScheduled: []h.ZSetEntry{ { @@ -332,6 +409,12 @@ func TestClientEnqueueIn(t *testing.T) { task: task, delay: 0, opts: []Option{}, + wantRes: &Result{ + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { { @@ -351,11 +434,15 @@ func TestClientEnqueueIn(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - err := client.EnqueueIn(tc.delay, tc.task, tc.opts...) + gotRes, err := client.EnqueueIn(tc.delay, tc.task, tc.opts...) if err != nil { t.Error(err) continue } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotRes, tc.wantRes, diff) + } for qname, want := range tc.wantEnqueued { gotEnqueued := h.GetEnqueuedMessages(t, r, qname) @@ -379,6 +466,7 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts []Option // options set at the client level. opts []Option // options used at enqueue time. task *Task + wantRes *Result queue string // queue that the message should go into. want *base.TaskMessage }{ @@ -387,7 +475,13 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed")}, opts: []Option{}, task: NewTask("feed:import", nil), - queue: "feed", + wantRes: &Result{ + Queue: "feed", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, + queue: "feed", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -402,7 +496,13 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{}, task: NewTask("feed:import", nil), - queue: "feed", + wantRes: &Result{ + Queue: "feed", + Retry: 5, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, + queue: "feed", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -417,7 +517,13 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{Queue("critical")}, task: NewTask("feed:import", nil), - queue: "critical", + wantRes: &Result{ + Queue: "critical", + Retry: 5, + Timeout: defaultTimeout, + Deadline: noDeadline, + }, + queue: "critical", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -433,10 +539,14 @@ func TestClientDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) - err := c.Enqueue(tc.task, tc.opts...) + gotRes, err := c.Enqueue(tc.task, tc.opts...) if err != nil { t.Fatal(err) } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s", + tc.desc, gotRes, tc.wantRes, diff) + } enqueued := h.GetEnqueuedMessages(t, r, tc.queue) if len(enqueued) != 1 { t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.", @@ -538,7 +648,7 @@ func TestEnqueueUnique(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. // Enqueue the task first. It should succeed. - err := c.Enqueue(tc.task, Unique(tc.ttl)) + _, err := c.Enqueue(tc.task, Unique(tc.ttl)) if err != nil { t.Fatal(err) } @@ -550,7 +660,7 @@ func TestEnqueueUnique(t *testing.T) { } // Enqueue the task again. It should fail. - err = c.Enqueue(tc.task, Unique(tc.ttl)) + _, err = c.Enqueue(tc.task, Unique(tc.ttl)) if err == nil { t.Errorf("Enqueueing %+v did not return an error", tc.task) continue @@ -585,7 +695,7 @@ func TestEnqueueInUnique(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. // Enqueue the task first. It should succeed. - err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + _, err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) if err != nil { t.Fatal(err) } @@ -598,7 +708,7 @@ func TestEnqueueInUnique(t *testing.T) { } // Enqueue the task again. It should fail. - err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + _, err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) if err == nil { t.Errorf("Enqueueing %+v did not return an error", tc.task) continue @@ -633,7 +743,7 @@ func TestEnqueueAtUnique(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. // Enqueue the task first. It should succeed. - err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + _, err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) if err != nil { t.Fatal(err) } @@ -646,7 +756,7 @@ func TestEnqueueAtUnique(t *testing.T) { } // Enqueue the task again. It should fail. - err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + _, err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) if err == nil { t.Errorf("Enqueueing %+v did not return an error", tc.task) continue diff --git a/doc.go b/doc.go index aaaaa0c..8f8c87d 100644 --- a/doc.go +++ b/doc.go @@ -25,10 +25,10 @@ Task is created with two parameters: its type and payload. map[string]interface{}{"user_id": 42}) // Enqueue the task to be processed immediately. - err := client.Enqueue(t) + res, err := client.Enqueue(t) // Schedule the task to be processed after one minute. - err = client.EnqueueIn(time.Minute, t) + res, err = client.EnqueueIn(time.Minute, t) The Server is used to run the background task processing with a given handler. diff --git a/server_test.go b/server_test.go index 82d1c72..4d08f45 100644 --- a/server_test.go +++ b/server_test.go @@ -41,12 +41,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) + _, err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) { } for i := 0; i < 10; i++ { - err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) + _, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) if err != nil { t.Fatal(err) } - err = c.Enqueue(NewTask("bad_task", nil)) + _, err = c.Enqueue(NewTask("bad_task", nil)) if err != nil { t.Fatal(err) } - err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil)) + _, err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil)) if err != nil { t.Fatal(err) }