2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

(asynq): Update Inspector

This commit is contained in:
Ken Hibino 2021-09-24 14:50:24 -07:00
parent 88457c7a35
commit 43c909624e
2 changed files with 52 additions and 30 deletions

View File

@ -259,17 +259,21 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListPending(qname, pgn) infos, err := i.rdb.ListPending(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
now := time.Now()
var tasks []*TaskInfo var tasks []*TaskInfo
for _, m := range msgs { for _, i := range infos {
tasks = append(tasks, newTaskInfo(m, base.TaskStatePending, now)) tasks = append(tasks, newTaskInfo(
i.Message,
i.State,
i.NextProcessAt,
i.Result,
))
} }
return tasks, err return tasks, err
} }
@ -283,7 +287,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListActive(qname, pgn) infos, err := i.rdb.ListActive(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -291,8 +295,13 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, m := range msgs { for _, i := range infos {
tasks = append(tasks, newTaskInfo(m, base.TaskStateActive, time.Time{})) tasks = append(tasks, newTaskInfo(
i.Message,
i.State,
i.NextProcessAt,
i.Result,
))
} }
return tasks, err return tasks, err
} }
@ -307,7 +316,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(qname, pgn) infos, err := i.rdb.ListScheduled(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -315,11 +324,12 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
tasks = append(tasks, newTaskInfo( tasks = append(tasks, newTaskInfo(
z.Message, i.Message,
base.TaskStateScheduled, i.State,
time.Unix(z.Score, 0), i.NextProcessAt,
i.Result,
)) ))
} }
return tasks, nil return tasks, nil
@ -335,7 +345,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(qname, pgn) infos, err := i.rdb.ListRetry(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -343,11 +353,12 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
tasks = append(tasks, newTaskInfo( tasks = append(tasks, newTaskInfo(
z.Message, i.Message,
base.TaskStateRetry, i.State,
time.Unix(z.Score, 0), i.NextProcessAt,
i.Result,
)) ))
} }
return tasks, nil return tasks, nil
@ -363,7 +374,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListArchived(qname, pgn) infos, err := i.rdb.ListArchived(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -371,11 +382,12 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
tasks = append(tasks, newTaskInfo( tasks = append(tasks, newTaskInfo(
z.Message, i.Message,
base.TaskStateArchived, i.State,
time.Time{}, i.NextProcessAt,
i.Result,
)) ))
} }
return tasks, nil return tasks, nil
@ -391,7 +403,7 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListCompleted(qname, pgn) infos, err := i.rdb.ListCompleted(qname, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -399,11 +411,12 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
var tasks []*TaskInfo var tasks []*TaskInfo
for _, z := range zs { for _, i := range infos {
tasks = append(tasks, newTaskInfo( tasks = append(tasks, newTaskInfo(
z.Message, i.Message,
base.TaskStateCompleted, i.State,
time.Time{}, i.NextProcessAt,
i.Result,
)) ))
} }
return tasks, nil return tasks, nil

View File

@ -424,7 +424,7 @@ func TestInspectorHistory(t *testing.T) {
} }
func createPendingTask(msg *base.TaskMessage) *TaskInfo { func createPendingTask(msg *base.TaskMessage) *TaskInfo {
return newTaskInfo(msg, base.TaskStatePending, time.Now()) return newTaskInfo(msg, base.TaskStatePending, time.Now(), nil)
} }
func TestInspectorGetTaskInfo(t *testing.T) { func TestInspectorGetTaskInfo(t *testing.T) {
@ -489,6 +489,7 @@ func TestInspectorGetTaskInfo(t *testing.T) {
m1, m1,
base.TaskStateActive, base.TaskStateActive,
time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
nil,
), ),
}, },
{ {
@ -498,6 +499,7 @@ func TestInspectorGetTaskInfo(t *testing.T) {
m2, m2,
base.TaskStateScheduled, base.TaskStateScheduled,
fiveMinsFromNow, fiveMinsFromNow,
nil,
), ),
}, },
{ {
@ -507,6 +509,7 @@ func TestInspectorGetTaskInfo(t *testing.T) {
m3, m3,
base.TaskStateRetry, base.TaskStateRetry,
oneHourFromNow, oneHourFromNow,
nil,
), ),
}, },
{ {
@ -516,6 +519,7 @@ func TestInspectorGetTaskInfo(t *testing.T) {
m4, m4,
base.TaskStateArchived, base.TaskStateArchived,
time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
nil,
), ),
}, },
{ {
@ -525,6 +529,7 @@ func TestInspectorGetTaskInfo(t *testing.T) {
m5, m5,
base.TaskStatePending, base.TaskStatePending,
now, now,
nil,
), ),
}, },
} }
@ -722,8 +727,8 @@ func TestInspectorListActiveTasks(t *testing.T) {
}, },
qname: "default", qname: "default",
want: []*TaskInfo{ want: []*TaskInfo{
newTaskInfo(m1, base.TaskStateActive, time.Time{}), newTaskInfo(m1, base.TaskStateActive, time.Time{}, nil),
newTaskInfo(m2, base.TaskStateActive, time.Time{}), newTaskInfo(m2, base.TaskStateActive, time.Time{}, nil),
}, },
}, },
} }
@ -749,6 +754,7 @@ func createScheduledTask(z base.Z) *TaskInfo {
z.Message, z.Message,
base.TaskStateScheduled, base.TaskStateScheduled,
time.Unix(z.Score, 0), time.Unix(z.Score, 0),
nil,
) )
} }
@ -818,6 +824,7 @@ func createRetryTask(z base.Z) *TaskInfo {
z.Message, z.Message,
base.TaskStateRetry, base.TaskStateRetry,
time.Unix(z.Score, 0), time.Unix(z.Score, 0),
nil,
) )
} }
@ -888,6 +895,7 @@ func createArchivedTask(z base.Z) *TaskInfo {
z.Message, z.Message,
base.TaskStateArchived, base.TaskStateArchived,
time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
nil,
) )
} }
@ -964,6 +972,7 @@ func createCompletedTask(z base.Z) *TaskInfo {
z.Message, z.Message,
base.TaskStateCompleted, base.TaskStateCompleted,
time.Time{}, // zero value for n/a time.Time{}, // zero value for n/a
nil, // TODO: Test with result data
) )
} }