2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update Inspector to return TaskInfo from list methods

This commit is contained in:
Ken Hibino 2021-04-02 07:01:06 -07:00
parent 5c6068c78b
commit c02409c974
2 changed files with 142 additions and 263 deletions

View File

@ -255,9 +255,9 @@ func (t *TaskInfo) LastFailedAt() time.Time {
return time.Unix(t.info.LastFailedAt, 0) return time.Unix(t.info.LastFailedAt, 0)
} }
// LastErr returns the error message from the last failure. // LastError returns the error message from the last failure.
// Empty string is returned if the task has not failed. // Empty string is returned if the task has not failed.
func (t *TaskInfo) LastErr() string { func (t *TaskInfo) LastError() string {
return t.info.ErrorMsg return t.info.ErrorMsg
} }
@ -280,17 +280,8 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) {
return &TaskInfo{info}, nil return &TaskInfo{info}, nil
} }
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
}
// ActiveTask is a task that's currently being processed. // ActiveTask is a task that's currently being processed.
// TODO: remove this type
type ActiveTask struct { type ActiveTask struct {
*asynq.Task *asynq.Task
ID string ID string
@ -300,49 +291,6 @@ type ActiveTask struct {
LastError string LastError string
} }
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
NextProcessAt time.Time
score int64
}
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*asynq.Task
ID string
Queue string
NextProcessAt time.Time
MaxRetry int
Retried int
LastError string
// TODO: LastFailedAt time.Time
score int64
}
// ArchivedTask is a task archived for debugging and inspection purposes, and
// it won't be retried automatically.
// A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector.
type ArchivedTask struct {
*asynq.Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
LastError string
score int64
}
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
type ListOption interface{} type ListOption interface{}
@ -407,26 +355,19 @@ func Page(n int) ListOption {
// ListPendingTasks retrieves pending tasks from the specified queue. // ListPendingTasks retrieves pending tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListPending(qname, pgn) infos, err := i.rdb.ListPending(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*PendingTask var tasks []*TaskInfo
for _, m := range msgs { for _, i := range infos {
tasks = append(tasks, &PendingTask{ tasks = append(tasks, &TaskInfo{info: i})
Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
} }
return tasks, err return tasks, err
} }
@ -434,124 +375,82 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
// ListActiveTasks retrieves active tasks from the specified queue. // ListActiveTasks retrieves active tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListActive(qname, pgn) infos, err := i.rdb.ListActive(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*ActiveTask var tasks []*TaskInfo
for _, m := range msgs { for _, i := range infos {
tasks = append(tasks, &TaskInfo{info: i})
tasks = append(tasks, &ActiveTask{
Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
} }
return tasks, err return tasks, err
} }
// ListScheduledTasks retrieves scheduled tasks from the specified queue. // ListScheduledTasks retrieves scheduled tasks from the specified queue.
// Tasks are sorted by NextProcessAt field in ascending order. // Tasks are sorted by NextProcessAt in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(qname, pgn) infos, err := i.rdb.ListScheduled(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*ScheduledTask var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
processAt := time.Unix(z.Score, 0) tasks = append(tasks, &TaskInfo{info: i})
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastError: z.Message.ErrorMsg,
NextProcessAt: processAt,
score: z.Score,
})
} }
return tasks, nil return tasks, nil
} }
// ListRetryTasks retrieves retry tasks from the specified queue. // ListRetryTasks retrieves retry tasks from the specified queue.
// Tasks are sorted by NextProcessAt field in ascending order. // Tasks are sorted by NextProcessAt in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(qname, pgn) infos, err := i.rdb.ListRetry(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*RetryTask var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
processAt := time.Unix(z.Score, 0) tasks = append(tasks, &TaskInfo{info: i})
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextProcessAt: processAt,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
LastError: z.Message.ErrorMsg,
score: z.Score,
})
} }
return tasks, nil return tasks, nil
} }
// ListArchivedTasks retrieves archived tasks from the specified queue. // ListArchivedTasks retrieves archived tasks from the specified queue.
// Tasks are sorted by LastFailedAt field in descending order. // Tasks are sorted by LastFailedAt in descending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListArchived(qname, pgn) infos, err := i.rdb.ListArchived(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*ArchivedTask var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
failedAt := time.Unix(z.Score, 0) tasks = append(tasks, &TaskInfo{info: i})
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
LastError: z.Message.ErrorMsg,
score: z.Score,
})
} }
return tasks, nil return tasks, nil
} }

View File

@ -229,8 +229,8 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) {
defer inspector.Close() defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m4 := h.NewTaskMessage("task4", nil)
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -285,8 +285,8 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) {
defer inspector.Close() defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m4 := h.NewTaskMessage("task4", nil)
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -502,15 +502,14 @@ func TestInspectorHistory(t *testing.T) {
} }
} }
func createPendingTask(msg *base.TaskMessage) *PendingTask { func createPendingTaskInfo(msg *base.TaskMessage) *TaskInfo {
return &PendingTask{ info := &base.TaskInfo{
Task: asynq.NewTask(msg.Type, msg.Payload), TaskMessage: msg,
ID: msg.ID.String(), State: "pending",
Queue: msg.Queue, NextProcessAt: time.Now().Unix(),
MaxRetry: msg.Retry, LastFailedAt: 0,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
} }
return &TaskInfo{info}
} }
func TestInspectorListPendingTasks(t *testing.T) { func TestInspectorListPendingTasks(t *testing.T) {
@ -527,7 +526,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
desc string desc string
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
qname string qname string
want []*PendingTask want []*TaskInfo
}{ }{
{ {
desc: "with default queue", desc: "with default queue",
@ -535,9 +534,9 @@ func TestInspectorListPendingTasks(t *testing.T) {
"default": {m1, m2}, "default": {m1, m2},
}, },
qname: "default", qname: "default",
want: []*PendingTask{ want: []*TaskInfo{
createPendingTask(m1), createPendingTaskInfo(m1),
createPendingTask(m2), createPendingTaskInfo(m2),
}, },
}, },
{ {
@ -548,8 +547,8 @@ func TestInspectorListPendingTasks(t *testing.T) {
"low": {m4}, "low": {m4},
}, },
qname: "critical", qname: "critical",
want: []*PendingTask{ want: []*TaskInfo{
createPendingTask(m3), createPendingTaskInfo(m3),
}, },
}, },
{ {
@ -558,7 +557,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
"default": {}, "default": {},
}, },
qname: "default", qname: "default",
want: []*PendingTask(nil), want: []*TaskInfo(nil),
}, },
} }
@ -574,8 +573,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
tc.desc, tc.qname, err) tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
} }
@ -592,22 +590,21 @@ func TestInspectorListActiveTasks(t *testing.T) {
inspector := New(getRedisConnOpt(t)) inspector := New(getRedisConnOpt(t))
createActiveTask := func(msg *base.TaskMessage) *ActiveTask { createActiveTaskInfo := func(msg *base.TaskMessage) *TaskInfo {
return &ActiveTask{ info := &base.TaskInfo{
Task: asynq.NewTask(msg.Type, msg.Payload), TaskMessage: msg,
ID: msg.ID.String(), State: "active",
Queue: msg.Queue, NextProcessAt: 0,
MaxRetry: msg.Retry, LastFailedAt: 0,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
} }
return &TaskInfo{info}
} }
tests := []struct { tests := []struct {
desc string desc string
active map[string][]*base.TaskMessage active map[string][]*base.TaskMessage
qname string qname string
want []*ActiveTask want []*TaskInfo
}{ }{
{ {
desc: "with a few active tasks", desc: "with a few active tasks",
@ -616,9 +613,9 @@ func TestInspectorListActiveTasks(t *testing.T) {
"custom": {m3, m4}, "custom": {m3, m4},
}, },
qname: "default", qname: "default",
want: []*ActiveTask{ want: []*TaskInfo{
createActiveTask(m1), createActiveTaskInfo(m1),
createActiveTask(m2), createActiveTaskInfo(m2),
}, },
}, },
} }
@ -632,26 +629,21 @@ func TestInspectorListActiveTasks(t *testing.T) {
t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
} }
} }
} }
func createScheduledTask(z base.Z) *ScheduledTask { func createScheduledTaskInfo(z base.Z) *TaskInfo {
msg := z.Message info := &base.TaskInfo{
return &ScheduledTask{ TaskMessage: z.Message,
Task: asynq.NewTask(msg.Type, msg.Payload), State: "scheduled",
ID: msg.ID.String(), NextProcessAt: z.Score,
Queue: msg.Queue, LastFailedAt: 0,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
NextProcessAt: time.Unix(z.Score, 0),
score: z.Score,
} }
return &TaskInfo{info}
} }
func TestInspectorListScheduledTasks(t *testing.T) { func TestInspectorListScheduledTasks(t *testing.T) {
@ -673,7 +665,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
desc string desc string
scheduled map[string][]base.Z scheduled map[string][]base.Z
qname string qname string
want []*ScheduledTask want []*TaskInfo
}{ }{
{ {
desc: "with a few scheduled tasks", desc: "with a few scheduled tasks",
@ -683,10 +675,10 @@ func TestInspectorListScheduledTasks(t *testing.T) {
}, },
qname: "default", qname: "default",
// Should be sorted by NextProcessAt. // Should be sorted by NextProcessAt.
want: []*ScheduledTask{ want: []*TaskInfo{
createScheduledTask(z3), createScheduledTaskInfo(z3),
createScheduledTask(z1), createScheduledTaskInfo(z1),
createScheduledTask(z2), createScheduledTaskInfo(z2),
}, },
}, },
{ {
@ -695,7 +687,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
"default": {}, "default": {},
}, },
qname: "default", qname: "default",
want: []*ScheduledTask(nil), want: []*TaskInfo(nil),
}, },
} }
@ -708,26 +700,21 @@ func TestInspectorListScheduledTasks(t *testing.T) {
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
} }
} }
} }
func createRetryTask(z base.Z) *RetryTask { func createRetryTaskInfo(z base.Z) *TaskInfo {
msg := z.Message info := &base.TaskInfo{
return &RetryTask{ TaskMessage: z.Message,
Task: asynq.NewTask(msg.Type, msg.Payload), State: "retry",
ID: msg.ID.String(), NextProcessAt: z.Score,
Queue: msg.Queue, LastFailedAt: time.Now().Unix(),
NextProcessAt: time.Unix(z.Score, 0),
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
score: z.Score,
} }
return &TaskInfo{info}
} }
func TestInspectorListRetryTasks(t *testing.T) { func TestInspectorListRetryTasks(t *testing.T) {
@ -749,7 +736,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
desc string desc string
retry map[string][]base.Z retry map[string][]base.Z
qname string qname string
want []*RetryTask want []*TaskInfo
}{ }{
{ {
desc: "with a few retry tasks", desc: "with a few retry tasks",
@ -759,10 +746,10 @@ func TestInspectorListRetryTasks(t *testing.T) {
}, },
qname: "default", qname: "default",
// Should be sorted by NextProcessAt. // Should be sorted by NextProcessAt.
want: []*RetryTask{ want: []*TaskInfo{
createRetryTask(z3), createRetryTaskInfo(z3),
createRetryTask(z1), createRetryTaskInfo(z1),
createRetryTask(z2), createRetryTaskInfo(z2),
}, },
}, },
{ {
@ -771,7 +758,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
"default": {}, "default": {},
}, },
qname: "default", qname: "default",
want: []*RetryTask(nil), want: []*TaskInfo(nil),
}, },
// TODO(hibiken): ErrQueueNotFound when queue doesn't exist // TODO(hibiken): ErrQueueNotFound when queue doesn't exist
} }
@ -785,26 +772,21 @@ func TestInspectorListRetryTasks(t *testing.T) {
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListRetryTasks(%q) = %v, want %v; (-want,+got)\n%s",
t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
} }
} }
} }
func createArchivedTask(z base.Z) *ArchivedTask { func createArchivedTaskInfo(z base.Z) *TaskInfo {
msg := z.Message info := &base.TaskInfo{
return &ArchivedTask{ TaskMessage: z.Message,
Task: asynq.NewTask(msg.Type, msg.Payload), State: "archived",
ID: msg.ID.String(), NextProcessAt: 0,
Queue: msg.Queue, LastFailedAt: z.Score,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastFailedAt: time.Unix(z.Score, 0),
LastError: msg.ErrorMsg,
score: z.Score,
} }
return &TaskInfo{info}
} }
func TestInspectorListArchivedTasks(t *testing.T) { func TestInspectorListArchivedTasks(t *testing.T) {
@ -826,7 +808,7 @@ func TestInspectorListArchivedTasks(t *testing.T) {
desc string desc string
archived map[string][]base.Z archived map[string][]base.Z
qname string qname string
want []*ArchivedTask want []*TaskInfo
}{ }{
{ {
desc: "with a few archived tasks", desc: "with a few archived tasks",
@ -836,10 +818,10 @@ func TestInspectorListArchivedTasks(t *testing.T) {
}, },
qname: "default", qname: "default",
// Should be sorted by LastFailedAt. // Should be sorted by LastFailedAt.
want: []*ArchivedTask{ want: []*TaskInfo{
createArchivedTask(z2), createArchivedTaskInfo(z2),
createArchivedTask(z1), createArchivedTaskInfo(z1),
createArchivedTask(z3), createArchivedTaskInfo(z3),
}, },
}, },
{ {
@ -848,7 +830,7 @@ func TestInspectorListArchivedTasks(t *testing.T) {
"default": {}, "default": {},
}, },
qname: "default", qname: "default",
want: []*ArchivedTask(nil), want: []*TaskInfo(nil),
}, },
} }
@ -861,9 +843,8 @@ func TestInspectorListArchivedTasks(t *testing.T) {
t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListArchivedTasks(%q) = %v, want %v; (-want,+got)\n%s",
t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
} }
} }
@ -885,33 +866,33 @@ func TestInspectorListPagination(t *testing.T) {
tests := []struct { tests := []struct {
page int page int
pageSize int pageSize int
want []*PendingTask want []*TaskInfo
}{ }{
{ {
page: 1, page: 1,
pageSize: 5, pageSize: 5,
want: []*PendingTask{ want: []*TaskInfo{
createPendingTask(msgs[0]), createPendingTaskInfo(msgs[0]),
createPendingTask(msgs[1]), createPendingTaskInfo(msgs[1]),
createPendingTask(msgs[2]), createPendingTaskInfo(msgs[2]),
createPendingTask(msgs[3]), createPendingTaskInfo(msgs[3]),
createPendingTask(msgs[4]), createPendingTaskInfo(msgs[4]),
}, },
}, },
{ {
page: 3, page: 3,
pageSize: 10, pageSize: 10,
want: []*PendingTask{ want: []*TaskInfo{
createPendingTask(msgs[20]), createPendingTaskInfo(msgs[20]),
createPendingTask(msgs[21]), createPendingTaskInfo(msgs[21]),
createPendingTask(msgs[22]), createPendingTaskInfo(msgs[22]),
createPendingTask(msgs[23]), createPendingTaskInfo(msgs[23]),
createPendingTask(msgs[24]), createPendingTaskInfo(msgs[24]),
createPendingTask(msgs[25]), createPendingTaskInfo(msgs[25]),
createPendingTask(msgs[26]), createPendingTaskInfo(msgs[26]),
createPendingTask(msgs[27]), createPendingTaskInfo(msgs[27]),
createPendingTask(msgs[28]), createPendingTaskInfo(msgs[28]),
createPendingTask(msgs[29]), createPendingTaskInfo(msgs[29]),
}, },
}, },
} }
@ -922,9 +903,8 @@ func TestInspectorListPagination(t *testing.T) {
t.Errorf("ListPendingTask('default') returned error: %v", err) t.Errorf("ListPendingTask('default') returned error: %v", err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("ListPendingTasks('default') = %v, want %v; (-want,+got)\n%s",
t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff) got, tc.want, diff)
} }
} }
@ -1901,7 +1881,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
} }
} }
func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -1921,7 +1901,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
"custom": {m3}, "custom": {m3},
}, },
qname: "default", qname: "default",
id: createPendingTask(m2).ID, id: createPendingTaskInfo(m2).ID(),
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1}, "default": {m1},
"custom": {m3}, "custom": {m3},
@ -1933,7 +1913,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
"custom": {m3}, "custom": {m3},
}, },
qname: "custom", qname: "custom",
id: createPendingTask(m3).ID, id: createPendingTaskInfo(m3).ID(),
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2}, "default": {m1, m2},
"custom": {}, "custom": {},
@ -1961,7 +1941,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
} }
} }
func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -1986,7 +1966,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createScheduledTask(z2).ID, id: createScheduledTaskInfo(z2).ID(),
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2011,7 +1991,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
} }
} }
func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -2036,7 +2016,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createRetryTask(z2).ID, id: createRetryTaskInfo(z2).ID(),
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2061,7 +2041,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
} }
} }
func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { func TestInspectorDeleteTaskyDeletesArchivedTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -2086,7 +2066,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
"custom": {z3}, "custom": {z3},
}, },
qname: "default", qname: "default",
id: createArchivedTask(z2).ID, id: createArchivedTaskInfo(z2).ID(),
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2142,7 +2122,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "default", qname: "default",
id: createScheduledTask(z2).ID, id: createScheduledTaskInfo(z2).ID(),
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2212,7 +2192,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createRetryTask(z2).ID, id: createRetryTaskInfo(z2).ID(),
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2283,7 +2263,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) {
"low": {}, "low": {},
}, },
qname: "critical", qname: "critical",
id: createArchivedTask(z2).ID, id: createArchivedTaskInfo(z2).ID(),
wantArchived: map[string][]base.Z{ wantArchived: map[string][]base.Z{
"default": {z1}, "default": {z1},
"critical": {}, "critical": {},
@ -2350,7 +2330,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "default", qname: "default",
id: createPendingTask(m1).ID, id: createPendingTaskInfo(m1).ID(),
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
"custom": {m2, m3}, "custom": {m2, m3},
@ -2372,7 +2352,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createPendingTask(m2).ID, id: createPendingTaskInfo(m2).ID(),
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {m1}, "default": {m1},
"custom": {m3}, "custom": {m3},
@ -2446,7 +2426,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createScheduledTask(z2).ID, id: createScheduledTaskInfo(z2).ID(),
wantScheduled: map[string][]base.Z{ wantScheduled: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},
@ -2521,7 +2501,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
"custom": {}, "custom": {},
}, },
qname: "custom", qname: "custom",
id: createRetryTask(z2).ID, id: createRetryTaskInfo(z2).ID(),
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {z1}, "default": {z1},
"custom": {z3}, "custom": {z3},