2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Change TaskInfo to use public fields instead of methods

This commit is contained in:
Ken Hibino 2021-06-14 07:31:39 -07:00
parent e01c6379c8
commit a9feec5967
6 changed files with 322 additions and 303 deletions

122
asynq.go
View File

@ -38,71 +38,87 @@ func NewTask(typename string, payload []byte) *Task {
// A TaskInfo describes a task and its metadata. // A TaskInfo describes a task and its metadata.
type TaskInfo struct { type TaskInfo struct {
msg *base.TaskMessage // ID is the identifier of the task.
state base.TaskState ID string
nextProcessAt time.Time
// Queue is the name of the queue in which the task belongs.
Queue string
// Type is the type name of the task.
Type string
// Payload is the payload data of the task.
Payload []byte
// State indicates the task state.
State TaskState
// MaxRetry is the maximum number of times the task can be retried.
MaxRetry int
// Retried is the number of times the task has retried so far.
Retried int
// LastErr is the error message from the last failure.
LastErr string
// LastFailedAt is the time time of the last failure if any.
// If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}).
LastFailedAt time.Time
// Timeout is the duration the task can be processed by Handler before being retried,
// zero if not specified
Timeout time.Duration
// Deadline is the deadline for the task, zero value if not specified.
Deadline time.Time
// NextProcessAt is the time the task is scheduled to be processed,
// zero if not applicable.
NextProcessAt time.Time
} }
// ID returns the id of the task. func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo {
func (info *TaskInfo) ID() string { return info.msg.ID.String() } info := TaskInfo{
ID: msg.ID.String(),
Queue: msg.Queue,
Type: msg.Type,
Payload: msg.Payload, // Do we need to make a copy?
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastErr: msg.ErrorMsg,
Timeout: time.Duration(msg.Timeout) * time.Second,
NextProcessAt: nextProcessAt,
}
if msg.LastFailedAt == 0 {
info.LastFailedAt = time.Time{}
} else {
info.LastFailedAt = time.Unix(msg.LastFailedAt, 0)
}
// Queue returns the name of the queue in which the task belongs. if msg.Deadline == 0 {
func (info *TaskInfo) Queue() string { return info.msg.Queue } info.Deadline = time.Time{}
} else {
info.Deadline = time.Unix(msg.Deadline, 0)
}
// Type returns the type name of the task. switch state {
func (info *TaskInfo) Type() string { return info.msg.Type }
// Payload returns the payload data of the task.
func (info *TaskInfo) Payload() []byte { return info.msg.Payload }
func (info *TaskInfo) State() TaskState {
switch info.state {
case base.TaskStateActive: case base.TaskStateActive:
return TaskStateActive info.State = TaskStateActive
case base.TaskStatePending: case base.TaskStatePending:
return TaskStatePending info.State = TaskStatePending
case base.TaskStateScheduled: case base.TaskStateScheduled:
return TaskStateScheduled info.State = TaskStateScheduled
case base.TaskStateRetry: case base.TaskStateRetry:
return TaskStateRetry info.State = TaskStateRetry
case base.TaskStateArchived: case base.TaskStateArchived:
return TaskStateArchived info.State = TaskStateArchived
default:
panic(fmt.Sprintf("internal error: unknown state: %d", state))
} }
panic("internal error: unknown state in TaskInfo") return &info
} }
// MaxRetry returns the maximum number of times the task can be retried.
func (info *TaskInfo) MaxRetry() int { return info.msg.Retry }
// Retried returns the number of times the task has retried so far.
func (info *TaskInfo) Retried() int { return info.msg.Retried }
// LastErr returns the error message from the last failure.
// If the task has no failures, returns an empty string.
func (info *TaskInfo) LastErr() string { return info.msg.ErrorMsg }
// LastFailedAt returns the time of the last failure if any.
// If the task has no failures, returns zero time.
func (info *TaskInfo) LastFailedAt() time.Time { return time.Unix(info.msg.LastFailedAt, 0) }
// Timeout returns the duration the task can be processed by Handler before being retried,
// zero if not specified
func (info *TaskInfo) Timeout() time.Duration {
return time.Duration(info.msg.Timeout) * time.Second
}
// Deadline returns the deadline for the task, zero value if not specified.
func (info *TaskInfo) Deadline() time.Time {
if info.msg.Deadline == 0 {
return time.Time{}
}
return time.Unix(info.msg.Deadline, 0)
}
// NextProcessAt returns the time the task is scheduled to be processed,
// zero if not applicable.
func (info *TaskInfo) NextProcessAt() time.Time { return info.nextProcessAt }
// TaskState denotes the state of a task. // TaskState denotes the state of a task.
type TaskState int type TaskState int

View File

@ -320,7 +320,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
case err != nil: case err != nil:
return nil, err return nil, err
} }
return &TaskInfo{msg, state, opt.processAt}, nil return newTaskInfo(msg, state, opt.processAt), nil
} }
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {

View File

@ -42,15 +42,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: now, processAt: now,
opts: []Option{}, opts: []Option{},
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Queue: "default", State: TaskStatePending,
Retry: defaultMaxRetry, MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
}, LastErr: "",
state: base.TaskStatePending, LastFailedAt: time.Time{},
nextProcessAt: now, Timeout: defaultTimeout,
Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -74,16 +76,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: oneHourLater, processAt: oneHourLater,
opts: []Option{}, opts: []Option{},
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStateScheduled,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStateScheduled, Timeout: defaultTimeout,
nextProcessAt: oneHourLater, Deadline: time.Time{},
NextProcessAt: oneHourLater,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -116,8 +119,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
continue continue
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmp.AllowUnexported(TaskInfo{}), cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
@ -162,16 +164,17 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(3), MaxRetry(3),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: 3, State: TaskStatePending,
Queue: "default", MaxRetry: 3,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -193,16 +196,17 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(-2), MaxRetry(-2),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: 0, // Retry count should be set to zero State: TaskStatePending,
Queue: "default", MaxRetry: 0, // Retry count should be set to zero
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -225,16 +229,17 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(10), MaxRetry(10),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: 10, // Last option takes precedence State: TaskStatePending,
Queue: "default", MaxRetry: 10, // Last option takes precedence
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -256,16 +261,17 @@ func TestClientEnqueue(t *testing.T) {
Queue("custom"), Queue("custom"),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "custom",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "custom", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"custom": { "custom": {
@ -287,16 +293,17 @@ func TestClientEnqueue(t *testing.T) {
Queue("HIGH"), Queue("HIGH"),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "high",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "high", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"high": { "high": {
@ -318,16 +325,17 @@ func TestClientEnqueue(t *testing.T) {
Timeout(20 * time.Second), Timeout(20 * time.Second),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: 20, Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: 20 * time.Second,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -349,16 +357,17 @@ func TestClientEnqueue(t *testing.T) {
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: int64(noTimeout.Seconds()), Retried: 0,
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: noTimeout,
nextProcessAt: now, Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -381,16 +390,17 @@ func TestClientEnqueue(t *testing.T) {
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
}, },
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: 20, Retried: 0,
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: 20 * time.Second,
nextProcessAt: now, Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -416,8 +426,7 @@ func TestClientEnqueue(t *testing.T) {
continue continue
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmp.AllowUnexported(TaskInfo{}), cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
@ -457,16 +466,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
delay: 1 * time.Hour, delay: 1 * time.Hour,
opts: []Option{}, opts: []Option{},
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStateScheduled,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStateScheduled, Timeout: defaultTimeout,
nextProcessAt: time.Now().Add(1 * time.Hour), Deadline: time.Time{},
NextProcessAt: time.Now().Add(1 * time.Hour),
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -493,16 +503,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
delay: 0, delay: 0,
opts: []Option{}, opts: []Option{},
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "default",
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "default", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -532,8 +543,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
continue continue
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmp.AllowUnexported(TaskInfo{}), cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
@ -607,16 +617,17 @@ func TestClientDefaultOptions(t *testing.T) {
opts: []Option{}, opts: []Option{},
task: NewTask("feed:import", nil), task: NewTask("feed:import", nil),
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "feed",
Type: "feed:import", Type: "feed:import",
Payload: nil, Payload: nil,
Retry: defaultMaxRetry, State: TaskStatePending,
Queue: "feed", MaxRetry: defaultMaxRetry,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
queue: "feed", queue: "feed",
want: &base.TaskMessage{ want: &base.TaskMessage{
@ -634,16 +645,17 @@ func TestClientDefaultOptions(t *testing.T) {
opts: []Option{}, opts: []Option{},
task: NewTask("feed:import", nil), task: NewTask("feed:import", nil),
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "feed",
Type: "feed:import", Type: "feed:import",
Payload: nil, Payload: nil,
Retry: 5, State: TaskStatePending,
Queue: "feed", MaxRetry: 5,
Timeout: int64(defaultTimeout.Seconds()), Retried: 0,
Deadline: noDeadline.Unix(), LastErr: "",
}, LastFailedAt: time.Time{},
state: base.TaskStatePending, Timeout: defaultTimeout,
nextProcessAt: now, Deadline: time.Time{},
NextProcessAt: now,
}, },
queue: "feed", queue: "feed",
want: &base.TaskMessage{ want: &base.TaskMessage{
@ -661,16 +673,16 @@ func TestClientDefaultOptions(t *testing.T) {
opts: []Option{Queue("critical")}, opts: []Option{Queue("critical")},
task: NewTask("feed:import", nil), task: NewTask("feed:import", nil),
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
msg: &base.TaskMessage{ Queue: "critical",
Type: "feed:import", Type: "feed:import",
Payload: nil, Payload: nil,
Retry: 5, State: TaskStatePending,
Queue: "critical", MaxRetry: 5,
Timeout: int64(defaultTimeout.Seconds()), LastErr: "",
Deadline: noDeadline.Unix(), LastFailedAt: time.Time{},
}, Timeout: defaultTimeout,
state: base.TaskStatePending, Deadline: time.Time{},
nextProcessAt: now, NextProcessAt: now,
}, },
queue: "critical", queue: "critical",
want: &base.TaskMessage{ want: &base.TaskMessage{
@ -694,8 +706,7 @@ func TestClientDefaultOptions(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmp.AllowUnexported(TaskInfo{}), cmpopts.IgnoreFields(TaskInfo{}, "ID"),
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {

View File

@ -48,11 +48,14 @@ func (i *Inspector) Queues() ([]string, error) {
type QueueInfo struct { type QueueInfo struct {
// Name of the queue. // Name of the queue.
Queue string Queue string
// Total number of bytes that the queue and its tasks require to be stored in redis. // Total number of bytes that the queue and its tasks require to be stored in redis.
MemoryUsage int64 MemoryUsage int64
// Size is the total number of tasks in the queue. // Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
Size int Size int
// Number of pending tasks. // Number of pending tasks.
Pending int Pending int
// Number of active tasks. // Number of active tasks.
@ -63,15 +66,18 @@ type QueueInfo struct {
Retry int Retry int
// Number of archived tasks. // Number of archived tasks.
Archived int Archived int
// Total number of tasks being processed during the given date. // Total number of tasks being processed during the given date.
// The number includes both succeeded and failed tasks. // The number includes both succeeded and failed tasks.
Processed int Processed int
// Total number of tasks failed to be processed during the given date. // Total number of tasks failed to be processed during the given date.
Failed int Failed int
// Paused indicates whether the queue is paused. // Paused indicates whether the queue is paused.
// If true, tasks in the queue will not be processed. // If true, tasks in the queue will not be processed.
Paused bool Paused bool
// Time when this stats was taken.
// Time when this queue info snapshot was taken.
Timestamp time.Time Timestamp time.Time
} }
@ -184,11 +190,7 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) {
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
return &TaskInfo{ return newTaskInfo(info.Message, info.State, info.NextProcessAt), nil
msg: info.Message,
state: info.State,
nextProcessAt: info.NextProcessAt,
}, nil
} }
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
@ -271,11 +273,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI
now := time.Now() now := time.Now()
var tasks []*TaskInfo var tasks []*TaskInfo
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &TaskInfo{ tasks = append(tasks, newTaskInfo(m, base.TaskStatePending, now))
msg: m,
state: base.TaskStatePending,
nextProcessAt: now,
})
} }
return tasks, err return tasks, err
} }
@ -298,10 +296,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &TaskInfo{ tasks = append(tasks, newTaskInfo(m, base.TaskStateActive, time.Time{}))
msg: m,
state: base.TaskStateActive,
})
} }
return tasks, err return tasks, err
} }
@ -325,11 +320,11 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, z := range zs {
tasks = append(tasks, &TaskInfo{ tasks = append(tasks, newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateScheduled, base.TaskStateScheduled,
nextProcessAt: time.Unix(z.Score, 0), time.Unix(z.Score, 0),
}) ))
} }
return tasks, nil return tasks, nil
} }
@ -353,11 +348,11 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, z := range zs {
tasks = append(tasks, &TaskInfo{ tasks = append(tasks, newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateRetry, base.TaskStateRetry,
nextProcessAt: time.Unix(z.Score, 0), time.Unix(z.Score, 0),
}) ))
} }
return tasks, nil return tasks, nil
} }
@ -381,10 +376,11 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, z := range zs {
tasks = append(tasks, &TaskInfo{ tasks = append(tasks, newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateArchived, base.TaskStateArchived,
}) time.Time{},
))
} }
return tasks, nil return tasks, nil
} }

View File

@ -423,11 +423,7 @@ func TestInspectorHistory(t *testing.T) {
} }
func createPendingTask(msg *base.TaskMessage) *TaskInfo { func createPendingTask(msg *base.TaskMessage) *TaskInfo {
return &TaskInfo{ return newTaskInfo(msg, base.TaskStatePending, time.Now())
msg: msg,
state: base.TaskStatePending,
nextProcessAt: time.Now(),
}
} }
func TestInspectorGetTaskInfo(t *testing.T) { func TestInspectorGetTaskInfo(t *testing.T) {
@ -488,47 +484,47 @@ func TestInspectorGetTaskInfo(t *testing.T) {
{ {
qname: "default", qname: "default",
id: m1.ID.String(), id: m1.ID.String(),
want: &TaskInfo{ want: newTaskInfo(
msg: m1, m1,
state: base.TaskStateActive, base.TaskStateActive,
nextProcessAt: time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
}, ),
}, },
{ {
qname: "default", qname: "default",
id: m2.ID.String(), id: m2.ID.String(),
want: &TaskInfo{ want: newTaskInfo(
msg: m2, m2,
state: base.TaskStateScheduled, base.TaskStateScheduled,
nextProcessAt: fiveMinsFromNow, fiveMinsFromNow,
}, ),
}, },
{ {
qname: "custom", qname: "custom",
id: m3.ID.String(), id: m3.ID.String(),
want: &TaskInfo{ want: newTaskInfo(
msg: m3, m3,
state: base.TaskStateRetry, base.TaskStateRetry,
nextProcessAt: oneHourFromNow, oneHourFromNow,
}, ),
}, },
{ {
qname: "custom", qname: "custom",
id: m4.ID.String(), id: m4.ID.String(),
want: &TaskInfo{ want: newTaskInfo(
msg: m4, m4,
state: base.TaskStateArchived, base.TaskStateArchived,
nextProcessAt: time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
}, ),
}, },
{ {
qname: "custom", qname: "custom",
id: m5.ID.String(), id: m5.ID.String(),
want: &TaskInfo{ want: newTaskInfo(
msg: m5, m5,
state: base.TaskStatePending, base.TaskStatePending,
nextProcessAt: now, now,
}, ),
}, },
} }
@ -725,8 +721,8 @@ func TestInspectorListActiveTasks(t *testing.T) {
}, },
qname: "default", qname: "default",
want: []*TaskInfo{ want: []*TaskInfo{
{msg: m1, state: base.TaskStateActive, nextProcessAt: time.Time{}}, newTaskInfo(m1, base.TaskStateActive, time.Time{}),
{msg: m2, state: base.TaskStateActive, nextProcessAt: time.Time{}}, newTaskInfo(m2, base.TaskStateActive, time.Time{}),
}, },
}, },
} }
@ -748,11 +744,11 @@ func TestInspectorListActiveTasks(t *testing.T) {
} }
func createScheduledTask(z base.Z) *TaskInfo { func createScheduledTask(z base.Z) *TaskInfo {
return &TaskInfo{ return newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateScheduled, base.TaskStateScheduled,
nextProcessAt: time.Unix(z.Score, 0), time.Unix(z.Score, 0),
} )
} }
func TestInspectorListScheduledTasks(t *testing.T) { func TestInspectorListScheduledTasks(t *testing.T) {
@ -817,11 +813,11 @@ func TestInspectorListScheduledTasks(t *testing.T) {
} }
func createRetryTask(z base.Z) *TaskInfo { func createRetryTask(z base.Z) *TaskInfo {
return &TaskInfo{ return newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateRetry, base.TaskStateRetry,
nextProcessAt: time.Unix(z.Score, 0), time.Unix(z.Score, 0),
} )
} }
func TestInspectorListRetryTasks(t *testing.T) { func TestInspectorListRetryTasks(t *testing.T) {
@ -887,11 +883,11 @@ func TestInspectorListRetryTasks(t *testing.T) {
} }
func createArchivedTask(z base.Z) *TaskInfo { func createArchivedTask(z base.Z) *TaskInfo {
return &TaskInfo{ return newTaskInfo(
msg: z.Message, z.Message,
state: base.TaskStateArchived, base.TaskStateArchived,
nextProcessAt: time.Time{}, time.Time{}, // zero value for n/a
} )
} }
func TestInspectorListArchivedTasks(t *testing.T) { func TestInspectorListArchivedTasks(t *testing.T) {
@ -2047,7 +2043,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
"custom": {m3}, "custom": {m3},
}, },
qname: "default", qname: "default",
id: createPendingTask(m2).ID(), id: createPendingTask(m2).ID,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1}, "default": {m1},
"custom": {m3}, "custom": {m3},
@ -2059,7 +2055,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
"custom": {m3}, "custom": {m3},
}, },
qname: "custom", qname: "custom",
id: createPendingTask(m3).ID(), id: createPendingTask(m3).ID,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2}, "default": {m1, m2},
"custom": {}, "custom": {},
@ -2112,7 +2108,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createScheduledTask(z2).ID(), id: createScheduledTask(z2).ID,
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2162,7 +2158,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createRetryTask(z2).ID(), id: createRetryTask(z2).ID,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2212,7 +2208,7 @@ func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createArchivedTask(z2).ID(), id: createArchivedTask(z2).ID,
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2263,7 +2259,7 @@ func TestInspectorDeleteTaskError(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "nonexistent", qname: "nonexistent",
id: createArchivedTask(z2).ID(), id: createArchivedTask(z2).ID,
wantErr: ErrQueueNotFound, wantErr: ErrQueueNotFound,
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1, z2}, "default": {z1, z2},
@ -2333,7 +2329,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "default", qname: "default",
id: createScheduledTask(z2).ID(), id: createScheduledTask(z2).ID,
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2403,7 +2399,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createRetryTask(z2).ID(), id: createRetryTask(z2).ID,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2474,7 +2470,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) {
"low": {}, "low": {},
}, },
qname: "critical", qname: "critical",
id: createArchivedTask(z2).ID(), id: createArchivedTask(z2).ID,
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1}, "default": {z1},
"critical": {}, "critical": {},
@ -2548,7 +2544,7 @@ func TestInspectorRunTaskError(t *testing.T) {
"low": {}, "low": {},
}, },
qname: "nonexistent", qname: "nonexistent",
id: createArchivedTask(z2).ID(), id: createArchivedTask(z2).ID,
wantErr: ErrQueueNotFound, wantErr: ErrQueueNotFound,
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1}, "default": {z1},
@ -2641,7 +2637,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "default", qname: "default",
id: createPendingTask(m1).ID(), id: createPendingTask(m1).ID,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
"custom": {m2, m3}, "custom": {m2, m3},
@ -2663,7 +2659,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createPendingTask(m2).ID(), id: createPendingTask(m2).ID,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1}, "default": {m1},
"custom": {m3}, "custom": {m3},
@ -2736,7 +2732,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createScheduledTask(z2).ID(), id: createScheduledTask(z2).ID,
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2811,7 +2807,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createRetryTask(z2).ID(), id: createRetryTask(z2).ID,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2886,7 +2882,7 @@ func TestInspectorArchiveTaskError(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "nonexistent", qname: "nonexistent",
id: createRetryTask(z2).ID(), id: createRetryTask(z2).ID,
wantErr: ErrQueueNotFound, wantErr: ErrQueueNotFound,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},

View File

@ -127,7 +127,7 @@ func (j *enqueueJob) Run() {
} }
j.logger.Debugf("scheduler enqueued a task: %+v", info) j.logger.Debugf("scheduler enqueued a task: %+v", info)
event := &base.SchedulerEnqueueEvent{ event := &base.SchedulerEnqueueEvent{
TaskID: info.ID(), TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location), EnqueuedAt: time.Now().In(j.location),
} }
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)