From 40960d6acf6c2b279eddd2cbbd7746ac9afc56f4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 13 Sep 2021 05:55:21 -0700 Subject: [PATCH] Add ResultTTL option --- asynq.go | 14 ++++++++++++++ client.go | 17 +++++++++++++++++ client_test.go | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/asynq.go b/asynq.go index b3231ab..c09542f 100644 --- a/asynq.go +++ b/asynq.go @@ -81,6 +81,13 @@ type TaskInfo struct { // NextProcessAt is the time the task is scheduled to be processed, // zero if not applicable. NextProcessAt time.Time + + // ResulTTL is the retention period after the task is successfully processed. + ResultTTL time.Duration + + // CompletedAt is the time the task is processed successfully. + // Zero value (i.e. time.Time{}) indicates no value. + CompletedAt time.Time } func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo { @@ -93,6 +100,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time Retried: msg.Retried, LastErr: msg.ErrorMsg, Timeout: time.Duration(msg.Timeout) * time.Second, + ResultTTL: time.Duration(msg.ResultTTL) * time.Second, NextProcessAt: nextProcessAt, } if msg.LastFailedAt == 0 { @@ -107,6 +115,12 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time info.Deadline = time.Unix(msg.Deadline, 0) } + if msg.CompletedAt == 0 { + info.CompletedAt = time.Time{} + } else { + info.CompletedAt = time.Unix(msg.CompletedAt, 0) + } + switch state { case base.TaskStateActive: info.State = TaskStateActive diff --git a/client.go b/client.go index 0d61a6a..e40785c 100644 --- a/client.go +++ b/client.go @@ -46,6 +46,7 @@ const ( ProcessAtOpt ProcessInOpt TaskIDOpt + ResultTTLOpt ) // Option specifies the task processing behavior. @@ -70,6 +71,7 @@ type ( uniqueOption time.Duration processAtOption time.Time processInOption time.Duration + resultTTLOption time.Duration ) // MaxRetry returns an option to specify the max number of times @@ -178,6 +180,17 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v) func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Value() interface{} { return time.Duration(d) } +// ResultTTL returns an option to specify the retention period for the task. +// If this option is provided, the task will be stored as a completed task after successful processing. +// A completed task will be deleted after the TTL expires. +func ResultTTL(ttl time.Duration) Option { + return resultTTLOption(ttl) +} + +func (ttl resultTTLOption) String() string { return fmt.Sprintf("ResultTTL(%v)", time.Duration(ttl)) } +func (ttl resultTTLOption) Type() OptionType { return ResultTTLOpt } +func (ttl resultTTLOption) Value() interface{} { return time.Duration(ttl) } + // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. @@ -196,6 +209,7 @@ type option struct { deadline time.Time uniqueTTL time.Duration processAt time.Time + resultTTL time.Duration } // composeOptions merges user provided options into the default options @@ -237,6 +251,8 @@ func composeOptions(opts ...Option) (option, error) { res.processAt = time.Time(opt) case processInOption: res.processAt = time.Now().Add(time.Duration(opt)) + case resultTTLOption: + res.resultTTL = time.Duration(opt) default: // ignore unexpected option } @@ -316,6 +332,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { Deadline: deadline.Unix(), Timeout: int64(timeout.Seconds()), UniqueKey: uniqueKey, + ResultTTL: int64(opt.resultTTL.Seconds()), } now := time.Now() var state base.TaskState diff --git a/client_test.go b/client_test.go index a4e9751..4ad9fe7 100644 --- a/client_test.go +++ b/client_test.go @@ -416,6 +416,40 @@ func TestClientEnqueue(t *testing.T) { }, }, }, + { + desc: "With ResultTTL option", + task: task, + opts: []Option{ + ResultTTL(24 * time.Hour), + }, + wantInfo: &TaskInfo{ + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, + ResultTTL: 24 * time.Hour, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": { + { + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + ResultTTL: int64((24 * time.Hour).Seconds()), + }, + }, + }, + }, } for _, tc := range tests {