From 43c909624e8ebf5fb12ac683d18ef1844ef195c1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 24 Sep 2021 14:50:24 -0700 Subject: [PATCH] (asynq): Update Inspector --- inspector.go | 67 ++++++++++++++++++++++++++++------------------- inspector_test.go | 15 ++++++++--- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/inspector.go b/inspector.go index 9837573..a6d78ff 100644 --- a/inspector.go +++ b/inspector.go @@ -259,17 +259,21 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListPending(qname, pgn) + infos, err := i.rdb.ListPending(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) case err != nil: return nil, fmt.Errorf("asynq: %v", err) } - now := time.Now() var tasks []*TaskInfo - for _, m := range msgs { - tasks = append(tasks, newTaskInfo(m, base.TaskStatePending, now)) + for _, i := range infos { + tasks = append(tasks, newTaskInfo( + i.Message, + i.State, + i.NextProcessAt, + i.Result, + )) } return tasks, err } @@ -283,7 +287,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListActive(qname, pgn) + infos, err := i.rdb.ListActive(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -291,8 +295,13 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn return nil, fmt.Errorf("asynq: %v", err) } var tasks []*TaskInfo - for _, m := range msgs { - tasks = append(tasks, newTaskInfo(m, base.TaskStateActive, time.Time{})) + for _, i := range infos { + tasks = append(tasks, newTaskInfo( + i.Message, + i.State, + i.NextProcessAt, + i.Result, + )) } return tasks, err } @@ -307,7 +316,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListScheduled(qname, pgn) + infos, err := i.rdb.ListScheduled(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -315,11 +324,12 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas return nil, fmt.Errorf("asynq: %v", err) } var tasks []*TaskInfo - for _, z := range zs { + for _, i := range infos { tasks = append(tasks, newTaskInfo( - z.Message, - base.TaskStateScheduled, - time.Unix(z.Score, 0), + i.Message, + i.State, + i.NextProcessAt, + i.Result, )) } return tasks, nil @@ -335,7 +345,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListRetry(qname, pgn) + infos, err := i.rdb.ListRetry(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -343,11 +353,12 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf return nil, fmt.Errorf("asynq: %v", err) } var tasks []*TaskInfo - for _, z := range zs { + for _, i := range infos { tasks = append(tasks, newTaskInfo( - z.Message, - base.TaskStateRetry, - time.Unix(z.Score, 0), + i.Message, + i.State, + i.NextProcessAt, + i.Result, )) } return tasks, nil @@ -363,7 +374,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListArchived(qname, pgn) + infos, err := i.rdb.ListArchived(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -371,11 +382,12 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task return nil, fmt.Errorf("asynq: %v", err) } var tasks []*TaskInfo - for _, z := range zs { + for _, i := range infos { tasks = append(tasks, newTaskInfo( - z.Message, - base.TaskStateArchived, - time.Time{}, + i.Message, + i.State, + i.NextProcessAt, + i.Result, )) } return tasks, nil @@ -391,7 +403,7 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListCompleted(qname, pgn) + infos, err := i.rdb.ListCompleted(qname, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -399,11 +411,12 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas return nil, fmt.Errorf("asynq: %v", err) } var tasks []*TaskInfo - for _, z := range zs { + for _, i := range infos { tasks = append(tasks, newTaskInfo( - z.Message, - base.TaskStateCompleted, - time.Time{}, + i.Message, + i.State, + i.NextProcessAt, + i.Result, )) } return tasks, nil diff --git a/inspector_test.go b/inspector_test.go index d933898..af8c096 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -424,7 +424,7 @@ func TestInspectorHistory(t *testing.T) { } func createPendingTask(msg *base.TaskMessage) *TaskInfo { - return newTaskInfo(msg, base.TaskStatePending, time.Now()) + return newTaskInfo(msg, base.TaskStatePending, time.Now(), nil) } func TestInspectorGetTaskInfo(t *testing.T) { @@ -489,6 +489,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { m1, base.TaskStateActive, time.Time{}, // zero value for n/a + nil, ), }, { @@ -498,6 +499,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { m2, base.TaskStateScheduled, fiveMinsFromNow, + nil, ), }, { @@ -507,6 +509,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { m3, base.TaskStateRetry, oneHourFromNow, + nil, ), }, { @@ -516,6 +519,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { m4, base.TaskStateArchived, time.Time{}, // zero value for n/a + nil, ), }, { @@ -525,6 +529,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { m5, base.TaskStatePending, now, + nil, ), }, } @@ -722,8 +727,8 @@ func TestInspectorListActiveTasks(t *testing.T) { }, qname: "default", want: []*TaskInfo{ - newTaskInfo(m1, base.TaskStateActive, time.Time{}), - newTaskInfo(m2, base.TaskStateActive, time.Time{}), + newTaskInfo(m1, base.TaskStateActive, time.Time{}, nil), + newTaskInfo(m2, base.TaskStateActive, time.Time{}, nil), }, }, } @@ -749,6 +754,7 @@ func createScheduledTask(z base.Z) *TaskInfo { z.Message, base.TaskStateScheduled, time.Unix(z.Score, 0), + nil, ) } @@ -818,6 +824,7 @@ func createRetryTask(z base.Z) *TaskInfo { z.Message, base.TaskStateRetry, time.Unix(z.Score, 0), + nil, ) } @@ -888,6 +895,7 @@ func createArchivedTask(z base.Z) *TaskInfo { z.Message, base.TaskStateArchived, time.Time{}, // zero value for n/a + nil, ) } @@ -964,6 +972,7 @@ func createCompletedTask(z base.Z) *TaskInfo { z.Message, base.TaskStateCompleted, time.Time{}, // zero value for n/a + nil, // TODO: Test with result data ) }