diff --git a/CHANGELOG.md b/CHANGELOG.md index ce1371c..5c6973f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `Client` can optionally schedule task with `asynq.Deadline(time)` to specify deadline for task's context. Default is no deadline. + ## [0.6.0] - 2020-03-01 ### Added diff --git a/client.go b/client.go index 2bd88ee..b315d0e 100644 --- a/client.go +++ b/client.go @@ -34,9 +34,10 @@ type Option interface{} // Internal option representations. type ( - retryOption int - queueOption string - timeoutOption time.Duration + retryOption int + queueOption string + timeoutOption time.Duration + deadlineOption time.Time ) // MaxRetry returns an option to specify the max number of times @@ -64,17 +65,24 @@ func Timeout(d time.Duration) Option { return timeoutOption(d) } +// Deadline returns an option to specify the deadline for the given task. +func Deadline(t time.Time) Option { + return deadlineOption(t) +} + type option struct { - retry int - queue string - timeout time.Duration + retry int + queue string + timeout time.Duration + deadline time.Time } func composeOptions(opts ...Option) option { res := option{ - retry: defaultMaxRetry, - queue: base.DefaultQueueName, - timeout: 0, + retry: defaultMaxRetry, + queue: base.DefaultQueueName, + timeout: 0, + deadline: time.Time{}, } for _, opt := range opts { switch opt := opt.(type) { @@ -84,6 +92,8 @@ func composeOptions(opts ...Option) option { res.queue = string(opt) case timeoutOption: res.timeout = time.Duration(opt) + case deadlineOption: + res.deadline = time.Time(opt) default: // ignore unexpected option } @@ -105,12 +115,13 @@ const ( func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { opt := composeOptions(opts...) msg := &base.TaskMessage{ - ID: xid.New(), - Type: task.Type, - Payload: task.Payload.data, - Queue: opt.queue, - Retry: opt.retry, - Timeout: opt.timeout.String(), + ID: xid.New(), + Type: task.Type, + Payload: task.Payload.data, + Queue: opt.queue, + Retry: opt.retry, + Timeout: opt.timeout.String(), + Deadline: opt.deadline.Format(time.RFC3339), } return c.enqueue(msg, t) } diff --git a/client_test.go b/client_test.go index 35f4869..630ae62 100644 --- a/client_test.go +++ b/client_test.go @@ -25,6 +25,9 @@ func TestClientEnqueueAt(t *testing.T) { var ( now = time.Now() oneHourLater = now.Add(time.Hour) + + noTimeout = time.Duration(0).String() + noDeadline = time.Time{}.Format(time.RFC3339) ) tests := []struct { @@ -43,11 +46,12 @@ func TestClientEnqueueAt(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, @@ -62,11 +66,12 @@ func TestClientEnqueueAt(t *testing.T) { wantScheduled: []h.ZSetEntry{ { Msg: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, Score: float64(oneHourLater.Unix()), }, @@ -106,6 +111,11 @@ func TestClientEnqueue(t *testing.T) { task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + var ( + noTimeout = time.Duration(0).String() + noDeadline = time.Time{}.Format(time.RFC3339) + ) + tests := []struct { desc string task *Task @@ -121,11 +131,12 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 3, - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: 3, + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, @@ -139,11 +150,12 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 0, // Retry count should be set to zero - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: 0, // Retry count should be set to zero + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, @@ -158,11 +170,12 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 10, // Last option takes precedence - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: 10, // Last option takes precedence + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, @@ -176,11 +189,12 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "custom": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "custom", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "custom", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, @@ -194,17 +208,18 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "high": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "high", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "high", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, }, { - desc: "Timeout option sets the timeout duration", + desc: "With timeout option", task: task, opts: []Option{ Timeout(20 * time.Second), @@ -212,11 +227,31 @@ func TestClientEnqueue(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: (20 * time.Second).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: (20 * time.Second).String(), + Deadline: noDeadline, + }, + }, + }, + }, + { + desc: "With deadline option", + task: task, + opts: []Option{ + Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: noTimeout, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Format(time.RFC3339), }, }, }, @@ -250,6 +285,11 @@ func TestClientEnqueueIn(t *testing.T) { task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + var ( + noTimeout = time.Duration(0).String() + noDeadline = time.Time{}.Format(time.RFC3339) + ) + tests := []struct { desc string task *Task @@ -267,11 +307,12 @@ func TestClientEnqueueIn(t *testing.T) { wantScheduled: []h.ZSetEntry{ { Msg: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, Score: float64(time.Now().Add(time.Hour).Unix()), }, @@ -285,11 +326,12 @@ func TestClientEnqueueIn(t *testing.T) { wantEnqueued: map[string][]*base.TaskMessage{ "default": []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", - Timeout: time.Duration(0).String(), + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: noTimeout, + Deadline: noDeadline, }, }, }, diff --git a/internal/base/base.go b/internal/base/base.go index 2f9ab34..d523bc6 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -90,6 +90,13 @@ type TaskMessage struct { // // Zero means no limit. Timeout string + + // Deadline specifies the deadline for the task. + // Task won't be processed if it exceeded its deadline. + // The string shoulbe be in RFC3339 format. + // + // time.Time's zero value means no deadline. + Deadline string } // ProcessState holds process level information. diff --git a/processor.go b/processor.go index fa39424..d2c460b 100644 --- a/processor.go +++ b/processor.go @@ -188,7 +188,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. - logger.warn("Quitting worker to process task id=%s", msg.ID) + logger.warn("Quitting worker. task id=%s", msg.ID) return case resErr := <-resCh: // Note: One of three things should happen. @@ -391,14 +391,18 @@ func gcd(xs ...int) int { } // createContext returns a context and cancel function for a given task message. -func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) { +func createContext(msg *base.TaskMessage) (ctx context.Context, cancel context.CancelFunc) { + ctx = context.Background() timeout, err := time.ParseDuration(msg.Timeout) - if err != nil { - logger.error("cannot parse timeout duration for %+v", msg) - return context.WithCancel(context.Background()) + if err == nil && timeout != 0 { + ctx, cancel = context.WithTimeout(ctx, timeout) } - if timeout == 0 { - return context.WithCancel(context.Background()) + deadline, err := time.Parse(time.RFC3339, msg.Deadline) + if err == nil && !deadline.IsZero() { + ctx, cancel = context.WithDeadline(ctx, deadline) } - return context.WithTimeout(context.Background(), timeout) + if cancel == nil { + ctx, cancel = context.WithCancel(ctx) + } + return ctx, cancel } diff --git a/processor_test.go b/processor_test.go index 5e06efe..d439341 100644 --- a/processor_test.go +++ b/processor_test.go @@ -17,6 +17,7 @@ import ( h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" + "github.com/rs/xid" ) func TestProcessorSuccess(t *testing.T) { @@ -363,3 +364,85 @@ func TestPerform(t *testing.T) { } } } + +func TestCreateContextWithTimeRestrictions(t *testing.T) { + var ( + noTimeout = time.Duration(0) + noDeadline = time.Time{} + ) + + tests := []struct { + desc string + timeout time.Duration + deadline time.Time + wantDeadline time.Time + }{ + {"only with timeout", 10 * time.Second, noDeadline, time.Now().Add(10 * time.Second)}, + {"only with deadline", noTimeout, time.Now().Add(time.Hour), time.Now().Add(time.Hour)}, + {"with timeout and deadline (timeout < deadline)", 10 * time.Second, time.Now().Add(time.Hour), time.Now().Add(10 * time.Second)}, + {"with timeout and deadline (timeout > deadline)", 10 * time.Minute, time.Now().Add(30 * time.Second), time.Now().Add(30 * time.Second)}, + } + + for _, tc := range tests { + msg := &base.TaskMessage{ + Type: "something", + ID: xid.New(), + Timeout: tc.timeout.String(), + Deadline: tc.deadline.Format(time.RFC3339), + } + + ctx, cancel := createContext(msg) + + select { + case x := <-ctx.Done(): + t.Errorf("%s: <-ctx.Done() == %v, want nothing (it should block)", tc.desc, x) + default: + } + + got, ok := ctx.Deadline() + if !ok { + t.Errorf("%s: ctx.Deadline() returned false, want deadline to be set", tc.desc) + } + if !cmp.Equal(tc.wantDeadline, got, cmpopts.EquateApproxTime(time.Second)) { + t.Errorf("%s: ctx.Deadline() returned %v, want %v", tc.desc, got, tc.wantDeadline) + } + + cancel() + + select { + case <-ctx.Done(): + default: + t.Errorf("ctx.Done() blocked, want it to be non-blocking") + } + } +} + +func TestCreateContextWithoutTimeRestrictions(t *testing.T) { + msg := &base.TaskMessage{ + Type: "something", + ID: xid.New(), + Timeout: time.Duration(0).String(), // zero value to indicate no timeout + Deadline: time.Time{}.Format(time.RFC3339), // zero value to indicate no deadline + } + + ctx, cancel := createContext(msg) + + select { + case x := <-ctx.Done(): + t.Errorf("<-ctx.Done() == %v, want nothing (it should block)", x) + default: + } + + _, ok := ctx.Deadline() + if ok { + t.Error("ctx.Deadline() returned true, want deadline to not be set") + } + + cancel() + + select { + case <-ctx.Done(): + default: + t.Error("ctx.Done() blocked, want it to be non-blocking") + } +}