diff --git a/benchmark_test.go b/benchmark_test.go index 0342b80..6a68496 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -76,7 +76,7 @@ func BenchmarkEndToEnd(b *testing.B) { } for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { + if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -195,7 +195,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { // Schedule 10,000 tasks. for i := 0; i < count; i++ { t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.EnqueueAt(time.Now().Add(time.Second), t); err != nil { + if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } diff --git a/client.go b/client.go index 1c1d63b..5699746 100644 --- a/client.go +++ b/client.go @@ -42,11 +42,13 @@ type Option interface{} // Internal option representations. type ( - retryOption int - queueOption string - timeoutOption time.Duration - deadlineOption time.Time - uniqueOption time.Duration + retryOption int + queueOption string + timeoutOption time.Duration + deadlineOption time.Time + uniqueOption time.Duration + processAtOption time.Time + processInOption time.Duration ) // MaxRetry returns an option to specify the max number of times @@ -102,6 +104,20 @@ func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } +// ProcessAt returns an option to specify when to process the given task. +// +// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. +func ProcessAt(t time.Time) Option { + return processAtOption(t) +} + +// ProcessIn returns an option to specify when to process the given task relative to the current time. +// +// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others. +func ProcessIn(d time.Duration) Option { + return processInOption(d) +} + // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. @@ -113,6 +129,7 @@ type option struct { timeout time.Duration deadline time.Time uniqueTTL time.Duration + processAt time.Time } // composeOptions merges user provided options into the default options @@ -121,10 +138,11 @@ type option struct { // the user provided options fail the validations. func composeOptions(opts ...Option) (option, error) { res := option{ - retry: defaultMaxRetry, - queue: base.DefaultQueueName, - timeout: 0, // do not set to deafultTimeout here - deadline: time.Time{}, + retry: defaultMaxRetry, + queue: base.DefaultQueueName, + timeout: 0, // do not set to deafultTimeout here + deadline: time.Time{}, + processAt: time.Now(), } for _, opt := range opts { switch opt := opt.(type) { @@ -142,6 +160,10 @@ func composeOptions(opts ...Option) (option, error) { res.deadline = time.Time(opt) case uniqueOption: res.uniqueTTL = time.Duration(opt) + case processAtOption: + res.processAt = time.Time(opt) + case processInOption: + res.processAt = time.Now().Add(time.Duration(opt)) default: // ignore unexpected option } @@ -186,6 +208,9 @@ type Result struct { // ID is a unique identifier for the task. ID string + // ProcessAt indicates when the task should be processed. + ProcessAt time.Time + // Retry is the maximum number of retry for the task. Retry int @@ -210,50 +235,25 @@ type Result struct { Deadline time.Time } -// EnqueueAt schedules task to be enqueued at the specified time. -// -// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error. -// -// The argument opts specifies the behavior of task processing. -// If there are conflicting Option values the last one overrides others. -// By deafult, max retry is set to 25 and timeout is set to 30 minutes. -func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) { - return c.enqueueAt(t, task, opts...) +// Close closes the connection with redis server. +func (c *Client) Close() error { + return c.rdb.Close() } -// Enqueue enqueues task to be processed immediately. +// Enqueue enqueues the given task to be processed asynchronously. // // Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. // // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. +// If no ProcessAt or ProcessIn options are passed, the task will be processed immediately. func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { - return c.enqueueAt(time.Now(), task, opts...) -} - -// EnqueueIn schedules task to be enqueued after the specified delay. -// -// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error. -// -// The argument opts specifies the behavior of task processing. -// If there are conflicting Option values the last one overrides others. -// By deafult, max retry is set to 25 and timeout is set to 30 minutes. -func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) (*Result, error) { - return c.enqueueAt(time.Now().Add(d), task, opts...) -} - -// Close closes the connection with redis server. -func (c *Client) Close() error { - return c.rdb.Close() -} - -func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) { c.mu.Lock() - defer c.mu.Unlock() if defaults, ok := c.opts[task.Type]; ok { opts = append(defaults, opts...) } + c.mu.Unlock() opt, err := composeOptions(opts...) if err != nil { return nil, err @@ -285,10 +285,11 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er UniqueKey: uniqueKey, } now := time.Now() - if t.Before(now) || t.Equal(now) { + if opt.processAt.Before(now) || opt.processAt.Equal(now) { + opt.processAt = now err = c.enqueue(msg, opt.uniqueTTL) } else { - err = c.schedule(msg, t, opt.uniqueTTL) + err = c.schedule(msg, opt.processAt, opt.uniqueTTL) } switch { case err == rdb.ErrDuplicateTask: @@ -297,11 +298,12 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er return nil, err } return &Result{ - ID: msg.ID.String(), - Queue: msg.Queue, - Retry: msg.Retry, - Timeout: timeout, - Deadline: deadline, + ID: msg.ID.String(), + ProcessAt: opt.processAt, + Queue: msg.Queue, + Retry: msg.Retry, + Timeout: timeout, + Deadline: deadline, }, nil } diff --git a/client_test.go b/client_test.go index c4ec428..57556e5 100644 --- a/client_test.go +++ b/client_test.go @@ -15,7 +15,7 @@ import ( "github.com/hibiken/asynq/internal/base" ) -func TestClientEnqueueAt(t *testing.T) { +func TestClientEnqueueWithProcessAtOption(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) @@ -29,8 +29,8 @@ func TestClientEnqueueAt(t *testing.T) { tests := []struct { desc string task *Task - processAt time.Time - opts []Option + processAt time.Time // value for ProcessAt option + opts []Option // other options wantRes *Result wantEnqueued map[string][]*base.TaskMessage wantScheduled map[string][]base.Z @@ -41,10 +41,11 @@ func TestClientEnqueueAt(t *testing.T) { processAt: now, opts: []Option{}, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -68,10 +69,11 @@ func TestClientEnqueueAt(t *testing.T) { processAt: oneHourLater, opts: []Option{}, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: oneHourLater, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -97,12 +99,17 @@ func TestClientEnqueueAt(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - gotRes, err := client.EnqueueAt(tc.processAt, tc.task, tc.opts...) + opts := append(tc.opts, ProcessAt(tc.processAt)) + gotRes, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s", tc.desc, gotRes, tc.wantRes, diff) } @@ -127,6 +134,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + now := time.Now() tests := []struct { desc string @@ -142,10 +150,11 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(3), }, wantRes: &Result{ - Queue: "default", - Retry: 3, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: 3, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -167,10 +176,11 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(-2), }, wantRes: &Result{ - Queue: "default", - Retry: 0, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: 0, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -193,10 +203,11 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(10), }, wantRes: &Result{ - Queue: "default", - Retry: 10, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: 10, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -218,10 +229,11 @@ func TestClientEnqueue(t *testing.T) { Queue("custom"), }, wantRes: &Result{ - Queue: "custom", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "custom", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "custom": { @@ -243,10 +255,11 @@ func TestClientEnqueue(t *testing.T) { Queue("HIGH"), }, wantRes: &Result{ - Queue: "high", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "high", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "high": { @@ -268,10 +281,11 @@ func TestClientEnqueue(t *testing.T) { Timeout(20 * time.Second), }, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: 20 * time.Second, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -293,10 +307,11 @@ func TestClientEnqueue(t *testing.T) { Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: noTimeout, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: noTimeout, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -319,10 +334,11 @@ func TestClientEnqueue(t *testing.T) { Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: 20 * time.Second, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -347,7 +363,11 @@ func TestClientEnqueue(t *testing.T) { t.Error(err) continue } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", tc.desc, gotRes, tc.wantRes, diff) } @@ -361,17 +381,18 @@ func TestClientEnqueue(t *testing.T) { } } -func TestClientEnqueueIn(t *testing.T) { +func TestClientEnqueueWithProcessInOption(t *testing.T) { r := setup(t) client := NewClient(getRedisConnOpt(t)) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + now := time.Now() tests := []struct { desc string task *Task - delay time.Duration - opts []Option + delay time.Duration // value for ProcessIn option + opts []Option // other options wantRes *Result wantEnqueued map[string][]*base.TaskMessage wantScheduled map[string][]base.Z @@ -379,13 +400,14 @@ func TestClientEnqueueIn(t *testing.T) { { desc: "schedule a task to be enqueued in one hour", task: task, - delay: time.Hour, + delay: 1 * time.Hour, opts: []Option{}, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now.Add(1 * time.Hour), + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -412,10 +434,11 @@ func TestClientEnqueueIn(t *testing.T) { delay: 0, opts: []Option{}, wantRes: &Result{ - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantEnqueued: map[string][]*base.TaskMessage{ "default": { @@ -438,12 +461,17 @@ func TestClientEnqueueIn(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - gotRes, err := client.EnqueueIn(tc.delay, tc.task, tc.opts...) + opts := append(tc.opts, ProcessIn(tc.delay)) + gotRes, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s", tc.desc, gotRes, tc.wantRes, diff) } @@ -496,6 +524,8 @@ func TestClientEnqueueError(t *testing.T) { func TestClientDefaultOptions(t *testing.T) { r := setup(t) + now := time.Now() + tests := []struct { desc string defaultOpts []Option // options set at the client level. @@ -511,10 +541,11 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{}, task: NewTask("feed:import", nil), wantRes: &Result{ - Queue: "feed", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "feed", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, queue: "feed", want: &base.TaskMessage{ @@ -532,10 +563,11 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{}, task: NewTask("feed:import", nil), wantRes: &Result{ - Queue: "feed", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "feed", + Retry: 5, + Timeout: defaultTimeout, + Deadline: noDeadline, }, queue: "feed", want: &base.TaskMessage{ @@ -553,10 +585,11 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{Queue("critical")}, task: NewTask("feed:import", nil), wantRes: &Result{ - Queue: "critical", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, + ProcessAt: now, + Queue: "critical", + Retry: 5, + Timeout: defaultTimeout, + Deadline: noDeadline, }, queue: "critical", want: &base.TaskMessage{ @@ -578,7 +611,11 @@ func TestClientDefaultOptions(t *testing.T) { if err != nil { t.Fatal(err) } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpopts.IgnoreFields(Result{}, "ID")); diff != "" { + cmpOptions := []cmp.Option{ + cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.EquateApproxTime(500 * time.Millisecond), + } + if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s", tc.desc, gotRes, tc.wantRes, diff) } @@ -638,7 +675,7 @@ func TestClientEnqueueUnique(t *testing.T) { } } -func TestClientEnqueueInUnique(t *testing.T) { +func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { r := setup(t) c := NewClient(getRedisConnOpt(t)) @@ -658,7 +695,7 @@ func TestClientEnqueueInUnique(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. // Enqueue the task first. It should succeed. - _, err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + _, err := c.Enqueue(tc.task, ProcessIn(tc.d), Unique(tc.ttl)) if err != nil { t.Fatal(err) } @@ -671,7 +708,7 @@ func TestClientEnqueueInUnique(t *testing.T) { } // Enqueue the task again. It should fail. - _, err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + _, err = c.Enqueue(tc.task, ProcessIn(tc.d), Unique(tc.ttl)) if err == nil { t.Errorf("Enqueueing %+v did not return an error", tc.task) continue @@ -683,7 +720,7 @@ func TestClientEnqueueInUnique(t *testing.T) { } } -func TestClientEnqueueAtUnique(t *testing.T) { +func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { r := setup(t) c := NewClient(getRedisConnOpt(t)) @@ -703,7 +740,7 @@ func TestClientEnqueueAtUnique(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. // Enqueue the task first. It should succeed. - _, err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + _, err := c.Enqueue(tc.task, ProcessAt(tc.at), Unique(tc.ttl)) if err != nil { t.Fatal(err) } @@ -716,7 +753,7 @@ func TestClientEnqueueAtUnique(t *testing.T) { } // Enqueue the task again. It should fail. - _, err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + _, err = c.Enqueue(tc.task, ProcessAt(tc.at), Unique(tc.ttl)) if err == nil { t.Errorf("Enqueueing %+v did not return an error", tc.task) continue diff --git a/server_test.go b/server_test.go index 11dda1d..4242c27 100644 --- a/server_test.go +++ b/server_test.go @@ -43,7 +43,7 @@ func TestServer(t *testing.T) { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) + _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -189,7 +189,7 @@ func TestServerWithFlakyBroker(t *testing.T) { if err != nil { t.Fatal(err) } - _, err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil)) + _, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second)) if err != nil { t.Fatal(err) }