mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Use int64 type for Timeout and Deadline in TaskMessage
This commit is contained in:
parent
379da8f7a2
commit
7e942ec241
@ -250,8 +250,8 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Queue: opt.queue,
|
Queue: opt.queue,
|
||||||
Retry: opt.retry,
|
Retry: opt.retry,
|
||||||
Deadline: int(deadline.Unix()),
|
Deadline: deadline.Unix(),
|
||||||
Timeout: int(timeout.Seconds()),
|
Timeout: int64(timeout.Seconds()),
|
||||||
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
|
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
|
@ -49,8 +49,8 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -69,8 +69,8 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
Score: float64(oneHourLater.Unix()),
|
Score: float64(oneHourLater.Unix()),
|
||||||
},
|
},
|
||||||
@ -129,8 +129,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: 3,
|
Retry: 3,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -148,8 +148,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: 0, // Retry count should be set to zero
|
Retry: 0, // Retry count should be set to zero
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -168,8 +168,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: 10, // Last option takes precedence
|
Retry: 10, // Last option takes precedence
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -187,8 +187,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "custom",
|
Queue: "custom",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -206,8 +206,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "high",
|
Queue: "high",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -226,7 +226,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: 20,
|
Timeout: 20,
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -244,8 +244,8 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(noTimeout.Seconds()),
|
Timeout: int64(noTimeout.Seconds()),
|
||||||
Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -265,7 +265,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: 20,
|
Timeout: 20,
|
||||||
Deadline: int(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix()),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -320,8 +320,8 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
Score: float64(time.Now().Add(time.Hour).Unix()),
|
Score: float64(time.Now().Add(time.Hour).Unix()),
|
||||||
},
|
},
|
||||||
@ -339,8 +339,8 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
Payload: task.Payload.data,
|
Payload: task.Payload.data,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -393,8 +393,8 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
Payload: nil,
|
Payload: nil,
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "feed",
|
Queue: "feed",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -408,8 +408,8 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
Payload: nil,
|
Payload: nil,
|
||||||
Retry: 5,
|
Retry: 5,
|
||||||
Queue: "feed",
|
Queue: "feed",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -423,8 +423,8 @@ func TestClientDefaultOptions(t *testing.T) {
|
|||||||
Payload: nil,
|
Payload: nil,
|
||||||
Retry: 5,
|
Retry: 5,
|
||||||
Queue: "critical",
|
Queue: "critical",
|
||||||
Timeout: int(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: int(noDeadline.Unix()),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ type TaskMessage struct {
|
|||||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||||
//
|
//
|
||||||
// Use zero to indicate no timeout.
|
// Use zero to indicate no timeout.
|
||||||
Timeout int
|
Timeout int64
|
||||||
|
|
||||||
// Deadline specifies the deadline for the task in Unix time,
|
// Deadline specifies the deadline for the task in Unix time,
|
||||||
// the number of seconds elapsed since January 1, 1970 UTC.
|
// the number of seconds elapsed since January 1, 1970 UTC.
|
||||||
@ -102,7 +102,7 @@ type TaskMessage struct {
|
|||||||
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
// if retry count is remaining. Otherwise it will be moved to the dead queue.
|
||||||
//
|
//
|
||||||
// Use zero to indicate no deadline.
|
// Use zero to indicate no deadline.
|
||||||
Deadline int
|
Deadline int64
|
||||||
|
|
||||||
// UniqueKey holds the redis key used for uniqueness lock for this task.
|
// UniqueKey holds the redis key used for uniqueness lock for this task.
|
||||||
//
|
//
|
||||||
|
@ -122,7 +122,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
Deadline: 0,
|
Deadline: 0,
|
||||||
}
|
}
|
||||||
t1Deadline := int(now.Unix()) + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2 := &base.TaskMessage{
|
t2 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "export_csv",
|
Type: "export_csv",
|
||||||
@ -135,10 +135,10 @@ func TestDequeue(t *testing.T) {
|
|||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "reindex",
|
Type: "reindex",
|
||||||
Payload: nil,
|
Payload: nil,
|
||||||
Timeout: int((5 * time.Minute).Seconds()),
|
Timeout: int64((5 * time.Minute).Seconds()),
|
||||||
Deadline: int(time.Now().Add(10 * time.Minute).Unix()),
|
Deadline: time.Now().Add(10 * time.Minute).Unix(),
|
||||||
}
|
}
|
||||||
t3Deadline := int(now.Unix()) + t3.Timeout // use whichever is earliest
|
t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued map[string][]*base.TaskMessage
|
enqueued map[string][]*base.TaskMessage
|
||||||
@ -392,7 +392,7 @@ func TestDone(t *testing.T) {
|
|||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
Deadline: 0,
|
Deadline: 0,
|
||||||
}
|
}
|
||||||
t1Deadline := int(now.Unix()) + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2 := &base.TaskMessage{
|
t2 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "export_csv",
|
Type: "export_csv",
|
||||||
@ -410,7 +410,7 @@ func TestDone(t *testing.T) {
|
|||||||
UniqueKey: "reindex:nil:default",
|
UniqueKey: "reindex:nil:default",
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
}
|
}
|
||||||
t3Deadline := int(now.Unix()) + t3.Deadline
|
t3Deadline := now.Unix() + t3.Deadline
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
inProgress []*base.TaskMessage // initial state of the in-progress list
|
inProgress []*base.TaskMessage // initial state of the in-progress list
|
||||||
@ -555,9 +555,9 @@ func TestRequeue(t *testing.T) {
|
|||||||
Queue: "critical",
|
Queue: "critical",
|
||||||
Timeout: 80,
|
Timeout: 80,
|
||||||
}
|
}
|
||||||
t1Deadline := int(now.Unix()) + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2Deadline := int(now.Unix()) + t2.Timeout
|
t2Deadline := now.Unix() + t2.Timeout
|
||||||
t3Deadline := int(now.Unix()) + t3.Timeout
|
t3Deadline := now.Unix() + t3.Timeout
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued map[string][]*base.TaskMessage // initial state of queues
|
enqueued map[string][]*base.TaskMessage // initial state of queues
|
||||||
@ -748,14 +748,14 @@ func TestRetry(t *testing.T) {
|
|||||||
Payload: map[string]interface{}{"subject": "Hola!"},
|
Payload: map[string]interface{}{"subject": "Hola!"},
|
||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
}
|
}
|
||||||
t1Deadline := int(now.Unix()) + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2 := &base.TaskMessage{
|
t2 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "gen_thumbnail",
|
Type: "gen_thumbnail",
|
||||||
Payload: map[string]interface{}{"path": "some/path/to/image.jpg"},
|
Payload: map[string]interface{}{"path": "some/path/to/image.jpg"},
|
||||||
Timeout: 3000,
|
Timeout: 3000,
|
||||||
}
|
}
|
||||||
t2Deadline := int(now.Unix()) + t2.Timeout
|
t2Deadline := now.Unix() + t2.Timeout
|
||||||
t3 := &base.TaskMessage{
|
t3 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "reindex",
|
Type: "reindex",
|
||||||
@ -877,7 +877,7 @@ func TestKill(t *testing.T) {
|
|||||||
Retried: 0,
|
Retried: 0,
|
||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
}
|
}
|
||||||
t1Deadline := int(now.Unix()) + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2 := &base.TaskMessage{
|
t2 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "reindex",
|
Type: "reindex",
|
||||||
@ -887,7 +887,7 @@ func TestKill(t *testing.T) {
|
|||||||
Retried: 0,
|
Retried: 0,
|
||||||
Timeout: 3000,
|
Timeout: 3000,
|
||||||
}
|
}
|
||||||
t2Deadline := int(now.Unix()) + t2.Timeout
|
t2Deadline := now.Unix() + t2.Timeout
|
||||||
t3 := &base.TaskMessage{
|
t3 := &base.TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
Type: "generate_csv",
|
Type: "generate_csv",
|
||||||
@ -897,7 +897,7 @@ func TestKill(t *testing.T) {
|
|||||||
Retried: 0,
|
Retried: 0,
|
||||||
Timeout: 60,
|
Timeout: 60,
|
||||||
}
|
}
|
||||||
t3Deadline := int(now.Unix()) + t3.Timeout
|
t3Deadline := now.Unix() + t3.Timeout
|
||||||
errMsg := "SMTP server not responding"
|
errMsg := "SMTP server not responding"
|
||||||
t1AfterKill := &base.TaskMessage{
|
t1AfterKill := &base.TaskMessage{
|
||||||
ID: t1.ID,
|
ID: t1.ID,
|
||||||
|
Loading…
Reference in New Issue
Block a user