From 09cbea66f6d0f6f449b87c36984b240b5956a904 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 13 May 2021 19:41:17 -0700 Subject: [PATCH] Define TaskInfo type --- asynq.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/asynq.go b/asynq.go index 6a7a65f..13acc77 100644 --- a/asynq.go +++ b/asynq.go @@ -13,6 +13,7 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/base" ) // Task represents a unit of work to be performed. @@ -35,6 +36,89 @@ func NewTask(typename string, payload []byte) *Task { } } +// A TaskInfo describes a task and its metadata. +type TaskInfo struct { + msg *base.TaskMessage + state base.TaskState + nextProcessAt time.Time +} + +// ID returns the id of the task. +func (info *TaskInfo) ID() string { return info.msg.ID.String() } + +// Queue returns the name of the queue in which the task belongs. +func (info *TaskInfo) Queue() string { return info.msg.Queue } + +// Type returns the type name of the task. +func (info *TaskInfo) Type() string { return info.msg.Type } + +// Payload returns the payload data of the task. +func (info *TaskInfo) Payload() []byte { return info.msg.Payload } + +func (info *TaskInfo) State() TaskState { + switch info.state { + case base.TaskStateActive: + return TaskStateActive + case base.TaskStatePending: + return TaskStatePending + case base.TaskStateScheduled: + return TaskStateScheduled + case base.TaskStateRetry: + return TaskStateRetry + case base.TaskStateArchived: + return TaskStateArchived + } + panic("internal error: unknown state in TaskInfo") +} + +// MaxRetry returns the maximum number of times the task can be retried. +func (info *TaskInfo) MaxRetry() int { return info.msg.Retry } + +// Retried returns the number of times the task has retried so far. +func (info *TaskInfo) Retried() int { return info.msg.Retried } + +// LastErr returns the error message from the last failure. +// If the task has no failures, returns an empty string. +func (info *TaskInfo) LastErr() string { return info.msg.ErrorMsg } + +// Timeout returns the duration the task can be processed by Handler before being retried, +// zero if not specified +func (info *TaskInfo) Timeout() time.Duration { + return time.Duration(info.msg.Timeout) * time.Second +} + +// Deadline returns the deadline for the task, zero value if not specified. +func (info *TaskInfo) Deadline() time.Time { + if info.msg.Deadline == 0 { + return time.Time{} + } + return time.Unix(info.msg.Deadline, 0) +} + +// NextProcessAt returns the time the task is scheduled to be processed, +// zero if not applicable. +func (info *TaskInfo) NextProcessAt() time.Time { return info.nextProcessAt } + +// TaskState denotes the state of a task. +type TaskState int + +const ( + // Indicates that the task is currently being processed by Handler. + TaskStateActive TaskState = iota + 1 + + // Indicates that the task is ready to be processed by Handler. + TaskStatePending + + // Indicates that the task is scheduled to be processed some time in the future. + TaskStateScheduled + + // Indicates that the task has previously failed and scheduled to be processed some time in the future. + TaskStateRetry + + // Indicates that the task is archived and stored for inspection purposes. + TaskStateArchived +) + // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // // RedisConnOpt represents a sum of following types: