2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Use default timeout of 30mins if both timeout and deadline are not

provided
This commit is contained in:
Ken Hibino 2020-06-16 21:12:50 -07:00
parent 0527b93432
commit 716d3d987e
4 changed files with 95 additions and 74 deletions

View File

@ -110,7 +110,7 @@ func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
timeout: 0,
timeout: 0, // do not set to deafultTimeout here
deadline: time.Time{},
}
for _, opt := range opts {
@ -165,8 +165,19 @@ func serializePayload(payload map[string]interface{}) string {
return b.String()
}
// Default max retry count used if nothing is specified.
const defaultMaxRetry = 25
const (
// Default max retry count used if nothing is specified.
defaultMaxRetry = 25
// Default timeout used if both timeout and deadline are not specified.
defaultTimeout = 30 * time.Minute
)
// Value zero indicates no timeout and no deadline.
var (
noTimeout time.Duration = 0
noDeadline time.Time = time.Unix(0, 0)
)
// SetDefaultOptions sets options to be used for a given task type.
// The argument opts specifies the behavior of task processing.
@ -221,14 +232,26 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
opts = append(defaults, opts...)
}
opt := composeOptions(opts...)
deadline := noDeadline
if !opt.deadline.IsZero() {
deadline = opt.deadline
}
timeout := noTimeout
if opt.timeout != 0 {
timeout = opt.timeout
}
if deadline.Equal(noDeadline) && timeout == noTimeout {
// If neither deadline nor timeout are set, use default timeout.
timeout = defaultTimeout
}
msg := &base.TaskMessage{
ID: xid.New(),
Type: task.Type,
Payload: task.Payload.data,
Queue: opt.queue,
Retry: opt.retry,
Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339),
Deadline: int(deadline.Unix()),
Timeout: int(timeout.Seconds()),
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
}
var err error

View File

@ -15,11 +15,6 @@ import (
"github.com/hibiken/asynq/internal/base"
)
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
func TestClientEnqueueAt(t *testing.T) {
r := setup(t)
client := NewClient(RedisClientOpt{
@ -54,8 +49,8 @@ func TestClientEnqueueAt(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -74,8 +69,8 @@ func TestClientEnqueueAt(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
Score: float64(oneHourLater.Unix()),
},
@ -134,8 +129,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -153,8 +148,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -173,8 +168,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -192,8 +187,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -211,8 +206,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -230,8 +225,8 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
Deadline: noDeadline,
Timeout: 20,
Deadline: int(noDeadline.Unix()),
},
},
},
@ -249,8 +244,28 @@ func TestClientEnqueue(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Format(time.RFC3339),
Timeout: int(noTimeout.Seconds()),
Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()),
},
},
},
},
{
desc: "With both deadline and timeout options",
task: task,
opts: []Option{
Timeout(20 * time.Second),
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: 20,
Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()),
},
},
},
@ -305,8 +320,8 @@ func TestClientEnqueueIn(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
Score: float64(time.Now().Add(time.Hour).Unix()),
},
@ -324,8 +339,8 @@ func TestClientEnqueueIn(t *testing.T) {
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
},
@ -378,8 +393,8 @@ func TestClientDefaultOptions(t *testing.T) {
Payload: nil,
Retry: defaultMaxRetry,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
{
@ -393,8 +408,8 @@ func TestClientDefaultOptions(t *testing.T) {
Payload: nil,
Retry: 5,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
{
@ -408,8 +423,8 @@ func TestClientDefaultOptions(t *testing.T) {
Payload: nil,
Retry: 5,
Queue: "critical",
Timeout: noTimeout,
Deadline: noDeadline,
Timeout: int(defaultTimeout.Seconds()),
Deadline: int(noDeadline.Unix()),
},
},
}

View File

@ -34,12 +34,15 @@ func createContext(msg *base.TaskMessage) (ctx context.Context, cancel context.C
retryCount: msg.Retried,
}
ctx = context.WithValue(context.Background(), metadataCtxKey, metadata)
timeout, err := time.ParseDuration(msg.Timeout)
if err == nil && timeout != 0 {
if msg.Timeout == 0 && msg.Deadline == 0 {
panic("asynq: internal error: missing both timeout and deadline")
}
if msg.Timeout != 0 {
timeout := time.Duration(msg.Timeout) * time.Second
ctx, cancel = context.WithTimeout(ctx, timeout)
}
deadline, err := time.Parse(time.RFC3339, msg.Deadline)
if err == nil && !deadline.IsZero() {
if msg.Deadline != 0 {
deadline := time.Unix(int64(msg.Deadline), 0)
ctx, cancel = context.WithDeadline(ctx, deadline)
}
if cancel == nil {

View File

@ -16,11 +16,6 @@ import (
)
func TestCreateContextWithTimeRestrictions(t *testing.T) {
var (
noTimeout = time.Duration(0)
noDeadline = time.Time{}
)
tests := []struct {
desc string
timeout time.Duration
@ -37,8 +32,8 @@ func TestCreateContextWithTimeRestrictions(t *testing.T) {
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: tc.timeout.String(),
Deadline: tc.deadline.Format(time.RFC3339),
Timeout: int(tc.timeout.Seconds()),
Deadline: int(tc.deadline.Unix()),
}
ctx, cancel := createContext(msg)
@ -68,33 +63,18 @@ func TestCreateContextWithTimeRestrictions(t *testing.T) {
}
func TestCreateContextWithoutTimeRestrictions(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Error("did not panic, want panic when both timeout and deadline are missing")
}
}()
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: time.Duration(0).String(), // zero value to indicate no timeout
Deadline: time.Time{}.Format(time.RFC3339), // zero value to indicate no deadline
}
ctx, cancel := createContext(msg)
select {
case x := <-ctx.Done():
t.Errorf("<-ctx.Done() == %v, want nothing (it should block)", x)
default:
}
_, ok := ctx.Deadline()
if ok {
t.Error("ctx.Deadline() returned true, want deadline to not be set")
}
cancel()
select {
case <-ctx.Done():
default:
t.Error("ctx.Done() blocked, want it to be non-blocking")
Timeout: 0, // zero indicates no timeout
Deadline: 0, // zero indicates no deadline
}
createContext(msg)
}
func TestGetTaskMetadataFromContext(t *testing.T) {
@ -102,8 +82,8 @@ func TestGetTaskMetadataFromContext(t *testing.T) {
desc string
msg *base.TaskMessage
}{
{"with zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 25, Retried: 0}},
{"with non-zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 10, Retried: 5}},
{"with zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 25, Retried: 0, Timeout: 1800}},
{"with non-zero retried message", &base.TaskMessage{Type: "something", ID: xid.New(), Retry: 10, Retried: 5, Timeout: 1800}},
}
for _, tc := range tests {