From 0527b934328b6185d2893610a2b34930d6c814da Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 16 Jun 2020 21:11:54 -0700 Subject: [PATCH] Change TaskMessage Timeout and Deadline to int * This change breaks existing tasks in Redis --- internal/asynqtest/asynqtest.go | 12 +++++++----- internal/base/base.go | 20 +++++++++++--------- internal/base/base_test.go | 30 ++++++++++++++++-------------- 3 files changed, 34 insertions(+), 28 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index a713deb..c59ff65 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -75,11 +75,13 @@ var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { return &base.TaskMessage{ - ID: xid.New(), - Type: taskType, - Queue: base.DefaultQueueName, - Retry: 25, - Payload: payload, + ID: xid.New(), + Type: taskType, + Queue: base.DefaultQueueName, + Retry: 25, + Payload: payload, + Timeout: 1800, // default timeout of 30 mins + Deadline: 0, // no deadline } } diff --git a/internal/base/base.go b/internal/base/base.go index e67b21e..185a839 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -89,18 +89,20 @@ type TaskMessage struct { // ErrorMsg holds the error message from the last failure. ErrorMsg string - // Timeout specifies how long a task may run. - // The string value should be compatible with time.Duration.ParseDuration. + // Timeout specifies timeout in seconds. + // If task processing doesn't complete within the timeout, the task will be retried + // if retry count is remaining. Otherwise it will be moved to the dead queue. // - // Zero means no limit. - Timeout string + // Use zero to indicate no timeout. + Timeout int - // Deadline specifies the deadline for the task. - // Task won't be processed if it exceeded its deadline. - // The string shoulbe be in RFC3339 format. + // Deadline specifies the deadline for the task in Unix time, + // the number of seconds elapsed since January 1, 1970 UTC. + // If task processing doesn't complete before the deadline, the task will be retried + // if retry count is remaining. Otherwise it will be moved to the dead queue. // - // time.Time's zero value means no deadline. - Deadline string + // Use zero to indicate no deadline. + Deadline int // UniqueKey holds the redis key used for uniqueness lock for this task. // diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 700ae53..5c7d2da 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -115,22 +115,24 @@ func TestMessageEncoding(t *testing.T) { }{ { in: &TaskMessage{ - Type: "task1", - Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true}, - ID: id, - Queue: "default", - Retry: 10, - Retried: 0, - Timeout: "0", + Type: "task1", + Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true}, + ID: id, + Queue: "default", + Retry: 10, + Retried: 0, + Timeout: 1800, + Deadline: 1692311100, }, out: &TaskMessage{ - Type: "task1", - Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}, - ID: id, - Queue: "default", - Retry: 10, - Retried: 0, - Timeout: "0", + Type: "task1", + Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}, + ID: id, + Queue: "default", + Retry: 10, + Retried: 0, + Timeout: 1800, + Deadline: 1692311100, }, }, }