diff --git a/client.go b/client.go index af0523a..b80bf66 100644 --- a/client.go +++ b/client.go @@ -110,7 +110,7 @@ func composeOptions(opts ...Option) option { res := option{ retry: defaultMaxRetry, queue: base.DefaultQueueName, - timeout: 0, + timeout: 0, // do not set to deafultTimeout here deadline: time.Time{}, } for _, opt := range opts { @@ -165,8 +165,19 @@ func serializePayload(payload map[string]interface{}) string { return b.String() } -// Default max retry count used if nothing is specified. -const defaultMaxRetry = 25 +const ( + // Default max retry count used if nothing is specified. + defaultMaxRetry = 25 + + // Default timeout used if both timeout and deadline are not specified. + defaultTimeout = 30 * time.Minute +) + +// Value zero indicates no timeout and no deadline. +var ( + noTimeout time.Duration = 0 + noDeadline time.Time = time.Unix(0, 0) +) // SetDefaultOptions sets options to be used for a given task type. // The argument opts specifies the behavior of task processing. @@ -221,14 +232,26 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error { opts = append(defaults, opts...) } opt := composeOptions(opts...) + deadline := noDeadline + if !opt.deadline.IsZero() { + deadline = opt.deadline + } + timeout := noTimeout + if opt.timeout != 0 { + timeout = opt.timeout + } + if deadline.Equal(noDeadline) && timeout == noTimeout { + // If neither deadline nor timeout are set, use default timeout. + timeout = defaultTimeout + } msg := &base.TaskMessage{ ID: xid.New(), Type: task.Type, Payload: task.Payload.data, Queue: opt.queue, Retry: opt.retry, - Timeout: opt.timeout.String(), - Deadline: opt.deadline.Format(time.RFC3339), + Deadline: int(deadline.Unix()), + Timeout: int(timeout.Seconds()), UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue), } var err error diff --git a/client_test.go b/client_test.go index 348d056..d1764fa 100644 --- a/client_test.go +++ b/client_test.go @@ -15,11 +15,6 @@ import ( "github.com/hibiken/asynq/internal/base" ) -var ( - noTimeout = time.Duration(0).String() - noDeadline = time.Time{}.Format(time.RFC3339) -) - func TestClientEnqueueAt(t *testing.T) { r := setup(t) client := NewClient(RedisClientOpt{ @@ -54,8 +49,8 @@ func TestClientEnqueueAt(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -74,8 +69,8 @@ func TestClientEnqueueAt(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, Score: float64(oneHourLater.Unix()), }, @@ -134,8 +129,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: 3, Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -153,8 +148,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: 0, // Retry count should be set to zero Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -173,8 +168,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: 10, // Last option takes precedence Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -192,8 +187,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "custom", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -211,8 +206,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "high", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -230,8 +225,8 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: (20 * time.Second).String(), - Deadline: noDeadline, + Timeout: 20, + Deadline: int(noDeadline.Unix()), }, }, }, @@ -249,8 +244,28 @@ func TestClientEnqueue(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: noTimeout, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Format(time.RFC3339), + Timeout: int(noTimeout.Seconds()), + Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()), + }, + }, + }, + }, + { + desc: "With both deadline and timeout options", + task: task, + opts: []Option{ + Timeout(20 * time.Second), + Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": { + { + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: 20, + Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()), }, }, }, @@ -305,8 +320,8 @@ func TestClientEnqueueIn(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, Score: float64(time.Now().Add(time.Hour).Unix()), }, @@ -324,8 +339,8 @@ func TestClientEnqueueIn(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, }, @@ -378,8 +393,8 @@ func TestClientDefaultOptions(t *testing.T) { Payload: nil, Retry: defaultMaxRetry, Queue: "feed", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, { @@ -393,8 +408,8 @@ func TestClientDefaultOptions(t *testing.T) { Payload: nil, Retry: 5, Queue: "feed", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, { @@ -408,8 +423,8 @@ func TestClientDefaultOptions(t *testing.T) { Payload: nil, Retry: 5, Queue: "critical", - Timeout: noTimeout, - Deadline: noDeadline, + Timeout: int(defaultTimeout.Seconds()), + Deadline: int(noDeadline.Unix()), }, }, } diff --git a/context.go b/context.go index ee69ba1..ba46450 100644 --- a/context.go +++ b/context.go @@ -34,12 +34,15 @@ func createContext(msg *base.TaskMessage) (ctx context.Context, cancel context.C retryCount: msg.Retried, } ctx = context.WithValue(context.Background(), metadataCtxKey, metadata) - timeout, err := time.ParseDuration(msg.Timeout) - if err == nil && timeout != 0 { + if msg.Timeout == 0 && msg.Deadline == 0 { + panic("asynq: internal error: missing both timeout and deadline") + } + if msg.Timeout != 0 { + timeout := time.Duration(msg.Timeout) * time.Second ctx, cancel = context.WithTimeout(ctx, timeout) } - deadline, err := time.Parse(time.RFC3339, msg.Deadline) - if err == nil && !deadline.IsZero() { + if msg.Deadline != 0 { + deadline := time.Unix(int64(msg.Deadline), 0) ctx, cancel = context.WithDeadline(ctx, deadline) } if cancel == nil { diff --git a/context_test.go b/context_test.go index 1a5eff6..4bb1fbb 100644 --- a/context_test.go +++ b/context_test.go @@ -16,11 +16,6 @@ import ( ) func TestCreateContextWithTimeRestrictions(t *testing.T) { - var ( - noTimeout = time.Duration(0) - noDeadline = time.Time{} - ) - tests := []struct { desc string timeout time.Duration @@ -37,8 +32,8 @@ func TestCreateContextWithTimeRestrictions(t *testing.T) { msg := &base.TaskMessage{ Type: "something", ID: xid.New(), - Timeout: tc.timeout.String(), - Deadline: tc.deadline.Format(time.RFC3339), + Timeout: int(tc.timeout.Seconds()), + Deadline: int(tc.deadline.Unix()), } ctx, cancel := createContext(msg) @@ -68,33 +63,18 @@ func TestCreateContextWithTimeRestrictions(t *testing.T) { } func TestCreateContextWithoutTimeRestrictions(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Error("did not panic, want panic when both timeout and deadline are missing") + } + }() msg := &base.TaskMessage{ Type: "something", ID: xid.New(), - Timeout: time.Duration(0).String(), // zero value to indicate no timeout - Deadline: time.Time{}.Format(time.RFC3339), // zero value to indicate no deadline - } - - ctx, cancel := createContext(msg) - - select { - case x := <-ctx.Done(): - t.Errorf("<-ctx.Done() == %v, want nothing (it should block)", x) - default: - } - - _, ok := ctx.Deadline() - if ok { - t.Error("ctx.Deadline() returned true, want deadline to not be set") - } - - cancel() - - select { - case <-ctx.Done(): - default: - t.Error("ctx.Done() blocked, want it to be non-blocking") + Timeout: 0, // zero indicates no timeout + Deadline: 0, // zero indicates no deadline } + createContext(msg) } func TestGetTaskMetadataFromContext(t *testing.T) { @@ -102,8 +82,8 @@ func TestGetTaskMetadataFromContext(t *testing.T) { desc string msg *base.TaskMessage }{ - {"with zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 25, Retried: 0}}, - {"with non-zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 10, Retried: 5}}, + {"with zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 25, Retried: 0, Timeout: 1800}}, + {"with non-zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 10, Retried: 5, Timeout: 1800}}, } for _, tc := range tests {