2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add Deadline option when enqueuing tasks

Deadline option sets the deadline for the given task's context deadline.
This commit is contained in:
Ken Hibino 2020-03-07 20:24:03 -08:00
parent 25992c2781
commit 3c722386b0
6 changed files with 225 additions and 74 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- `Client` can optionally schedule task with `asynq.Deadline(time)` to specify deadline for task's context. Default is no deadline.
## [0.6.0] - 2020-03-01 ## [0.6.0] - 2020-03-01
### Added ### Added

View File

@ -37,6 +37,7 @@ type (
retryOption int retryOption int
queueOption string queueOption string
timeoutOption time.Duration timeoutOption time.Duration
deadlineOption time.Time
) )
// MaxRetry returns an option to specify the max number of times // MaxRetry returns an option to specify the max number of times
@ -64,10 +65,16 @@ func Timeout(d time.Duration) Option {
return timeoutOption(d) return timeoutOption(d)
} }
// Deadline returns an option to specify the deadline for the given task.
func Deadline(t time.Time) Option {
return deadlineOption(t)
}
type option struct { type option struct {
retry int retry int
queue string queue string
timeout time.Duration timeout time.Duration
deadline time.Time
} }
func composeOptions(opts ...Option) option { func composeOptions(opts ...Option) option {
@ -75,6 +82,7 @@ func composeOptions(opts ...Option) option {
retry: defaultMaxRetry, retry: defaultMaxRetry,
queue: base.DefaultQueueName, queue: base.DefaultQueueName,
timeout: 0, timeout: 0,
deadline: time.Time{},
} }
for _, opt := range opts { for _, opt := range opts {
switch opt := opt.(type) { switch opt := opt.(type) {
@ -84,6 +92,8 @@ func composeOptions(opts ...Option) option {
res.queue = string(opt) res.queue = string(opt)
case timeoutOption: case timeoutOption:
res.timeout = time.Duration(opt) res.timeout = time.Duration(opt)
case deadlineOption:
res.deadline = time.Time(opt)
default: default:
// ignore unexpected option // ignore unexpected option
} }
@ -111,6 +121,7 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Timeout: opt.timeout.String(), Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339),
} }
return c.enqueue(msg, t) return c.enqueue(msg, t)
} }

View File

@ -25,6 +25,9 @@ func TestClientEnqueueAt(t *testing.T) {
var ( var (
now = time.Now() now = time.Now()
oneHourLater = now.Add(time.Hour) oneHourLater = now.Add(time.Hour)
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
) )
tests := []struct { tests := []struct {
@ -47,7 +50,8 @@ func TestClientEnqueueAt(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
@ -66,7 +70,8 @@ func TestClientEnqueueAt(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
Score: float64(oneHourLater.Unix()), Score: float64(oneHourLater.Unix()),
}, },
@ -106,6 +111,11 @@ func TestClientEnqueue(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct { tests := []struct {
desc string desc string
task *Task task *Task
@ -125,7 +135,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 3, Retry: 3,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
@ -143,7 +154,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero Retry: 0, // Retry count should be set to zero
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
@ -162,7 +174,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 10, // Last option takes precedence Retry: 10, // Last option takes precedence
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
@ -180,7 +193,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "custom", Queue: "custom",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
@ -198,13 +212,14 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "high", Queue: "high",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },
}, },
{ {
desc: "Timeout option sets the timeout duration", desc: "With timeout option",
task: task, task: task,
opts: []Option{ opts: []Option{
Timeout(20 * time.Second), Timeout(20 * time.Second),
@ -217,6 +232,26 @@ func TestClientEnqueue(t *testing.T) {
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: (20 * time.Second).String(), Timeout: (20 * time.Second).String(),
Deadline: noDeadline,
},
},
},
},
{
desc: "With deadline option",
task: task,
opts: []Option{
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Format(time.RFC3339),
}, },
}, },
}, },
@ -250,6 +285,11 @@ func TestClientEnqueueIn(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct { tests := []struct {
desc string desc string
task *Task task *Task
@ -271,7 +311,8 @@ func TestClientEnqueueIn(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
Score: float64(time.Now().Add(time.Hour).Unix()), Score: float64(time.Now().Add(time.Hour).Unix()),
}, },
@ -289,7 +330,8 @@ func TestClientEnqueueIn(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(), Timeout: noTimeout,
Deadline: noDeadline,
}, },
}, },
}, },

View File

@ -90,6 +90,13 @@ type TaskMessage struct {
// //
// Zero means no limit. // Zero means no limit.
Timeout string Timeout string
// Deadline specifies the deadline for the task.
// Task won't be processed if it exceeded its deadline.
// The string shoulbe be in RFC3339 format.
//
// time.Time's zero value means no deadline.
Deadline string
} }
// ProcessState holds process level information. // ProcessState holds process level information.

View File

@ -188,7 +188,7 @@ func (p *processor) exec() {
select { select {
case <-p.quit: case <-p.quit:
// time is up, quit this worker goroutine. // time is up, quit this worker goroutine.
logger.warn("Quitting worker to process task id=%s", msg.ID) logger.warn("Quitting worker. task id=%s", msg.ID)
return return
case resErr := <-resCh: case resErr := <-resCh:
// Note: One of three things should happen. // Note: One of three things should happen.
@ -391,14 +391,18 @@ func gcd(xs ...int) int {
} }
// createContext returns a context and cancel function for a given task message. // createContext returns a context and cancel function for a given task message.
func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) { func createContext(msg *base.TaskMessage) (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
timeout, err := time.ParseDuration(msg.Timeout) timeout, err := time.ParseDuration(msg.Timeout)
if err != nil { if err == nil && timeout != 0 {
logger.error("cannot parse timeout duration for %+v", msg) ctx, cancel = context.WithTimeout(ctx, timeout)
return context.WithCancel(context.Background())
} }
if timeout == 0 { deadline, err := time.Parse(time.RFC3339, msg.Deadline)
return context.WithCancel(context.Background()) if err == nil && !deadline.IsZero() {
ctx, cancel = context.WithDeadline(ctx, deadline)
} }
return context.WithTimeout(context.Background(), timeout) if cancel == nil {
ctx, cancel = context.WithCancel(ctx)
}
return ctx, cancel
} }

View File

@ -17,6 +17,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
) )
func TestProcessorSuccess(t *testing.T) { func TestProcessorSuccess(t *testing.T) {
@ -363,3 +364,85 @@ func TestPerform(t *testing.T) {
} }
} }
} }
func TestCreateContextWithTimeRestrictions(t *testing.T) {
var (
noTimeout = time.Duration(0)
noDeadline = time.Time{}
)
tests := []struct {
desc string
timeout time.Duration
deadline time.Time
wantDeadline time.Time
}{
{"only with timeout", 10 * time.Second, noDeadline, time.Now().Add(10 * time.Second)},
{"only with deadline", noTimeout, time.Now().Add(time.Hour), time.Now().Add(time.Hour)},
{"with timeout and deadline (timeout < deadline)", 10 * time.Second, time.Now().Add(time.Hour), time.Now().Add(10 * time.Second)},
{"with timeout and deadline (timeout > deadline)", 10 * time.Minute, time.Now().Add(30 * time.Second), time.Now().Add(30 * time.Second)},
}
for _, tc := range tests {
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: tc.timeout.String(),
Deadline: tc.deadline.Format(time.RFC3339),
}
ctx, cancel := createContext(msg)
select {
case x := <-ctx.Done():
t.Errorf("%s: <-ctx.Done() == %v, want nothing (it should block)", tc.desc, x)
default:
}
got, ok := ctx.Deadline()
if !ok {
t.Errorf("%s: ctx.Deadline() returned false, want deadline to be set", tc.desc)
}
if !cmp.Equal(tc.wantDeadline, got, cmpopts.EquateApproxTime(time.Second)) {
t.Errorf("%s: ctx.Deadline() returned %v, want %v", tc.desc, got, tc.wantDeadline)
}
cancel()
select {
case <-ctx.Done():
default:
t.Errorf("ctx.Done() blocked, want it to be non-blocking")
}
}
}
func TestCreateContextWithoutTimeRestrictions(t *testing.T) {
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: time.Duration(0).String(), // zero value to indicate no timeout
Deadline: time.Time{}.Format(time.RFC3339), // zero value to indicate no deadline
}
ctx, cancel := createContext(msg)
select {
case x := <-ctx.Done():
t.Errorf("<-ctx.Done() == %v, want nothing (it should block)", x)
default:
}
_, ok := ctx.Deadline()
if ok {
t.Error("ctx.Deadline() returned true, want deadline to not be set")
}
cancel()
select {
case <-ctx.Done():
default:
t.Error("ctx.Done() blocked, want it to be non-blocking")
}
}