2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Add ResultTTL option

This commit is contained in:
Ken Hibino 2021-09-13 05:55:21 -07:00
parent 61833b045f
commit 40960d6acf
3 changed files with 65 additions and 0 deletions

View File

@ -81,6 +81,13 @@ type TaskInfo struct {
// NextProcessAt is the time the task is scheduled to be processed,
// zero if not applicable.
NextProcessAt time.Time
// ResulTTL is the retention period after the task is successfully processed.
ResultTTL time.Duration
// CompletedAt is the time the task is processed successfully.
// Zero value (i.e. time.Time{}) indicates no value.
CompletedAt time.Time
}
func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo {
@ -93,6 +100,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
Timeout: time.Duration(msg.Timeout) * time.Second,
ResultTTL: time.Duration(msg.ResultTTL) * time.Second,
NextProcessAt: nextProcessAt,
}
if msg.LastFailedAt == 0 {
@ -107,6 +115,12 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
info.Deadline = time.Unix(msg.Deadline, 0)
}
if msg.CompletedAt == 0 {
info.CompletedAt = time.Time{}
} else {
info.CompletedAt = time.Unix(msg.CompletedAt, 0)
}
switch state {
case base.TaskStateActive:
info.State = TaskStateActive

View File

@ -46,6 +46,7 @@ const (
ProcessAtOpt
ProcessInOpt
TaskIDOpt
ResultTTLOpt
)
// Option specifies the task processing behavior.
@ -70,6 +71,7 @@ type (
uniqueOption time.Duration
processAtOption time.Time
processInOption time.Duration
resultTTLOption time.Duration
)
// MaxRetry returns an option to specify the max number of times
@ -178,6 +180,17 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) }
// ResultTTL returns an option to specify the retention period for the task.
// If this option is provided, the task will be stored as a completed task after successful processing.
// A completed task will be deleted after the TTL expires.
func ResultTTL(ttl time.Duration) Option {
return resultTTLOption(ttl)
}
func (ttl resultTTLOption) String() string { return fmt.Sprintf("ResultTTL(%v)", time.Duration(ttl)) }
func (ttl resultTTLOption) Type() OptionType { return ResultTTLOpt }
func (ttl resultTTLOption) Value() interface{} { return time.Duration(ttl) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@ -196,6 +209,7 @@ type option struct {
deadline time.Time
uniqueTTL time.Duration
processAt time.Time
resultTTL time.Duration
}
// composeOptions merges user provided options into the default options
@ -237,6 +251,8 @@ func composeOptions(opts ...Option) (option, error) {
res.processAt = time.Time(opt)
case processInOption:
res.processAt = time.Now().Add(time.Duration(opt))
case resultTTLOption:
res.resultTTL = time.Duration(opt)
default:
// ignore unexpected option
}
@ -316,6 +332,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey,
ResultTTL: int64(opt.resultTTL.Seconds()),
}
now := time.Now()
var state base.TaskState

View File

@ -416,6 +416,40 @@ func TestClientEnqueue(t *testing.T) {
},
},
},
{
desc: "With ResultTTL option",
task: task,
opts: []Option{
ResultTTL(24 * time.Hour),
},
wantInfo: &TaskInfo{
Queue: "default",
Type: task.Type(),
Payload: task.Payload(),
State: TaskStatePending,
MaxRetry: defaultMaxRetry,
Retried: 0,
LastErr: "",
LastFailedAt: time.Time{},
Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
ResultTTL: 24 * time.Hour,
},
wantPending: map[string][]*base.TaskMessage{
"default": {
{
Type: task.Type(),
Payload: task.Payload(),
Retry: defaultMaxRetry,
Queue: "default",
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
ResultTTL: int64((24 * time.Hour).Seconds()),
},
},
},
},
}
for _, tc := range tests {