From 840f7245b1fa3f30f8e9c1baceba6881eb197731 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 18 May 2021 19:13:52 -0700 Subject: [PATCH] Update List methods (expect for ListArchived) --- inspector.go | 101 +++++++++++++++------------------- inspector_test.go | 135 +++++++++++++++++++--------------------------- 2 files changed, 99 insertions(+), 137 deletions(-) diff --git a/inspector.go b/inspector.go index 797c481..9686e5f 100644 --- a/inspector.go +++ b/inspector.go @@ -352,25 +352,24 @@ func Page(n int) ListOption { // ListPendingTasks retrieves pending tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { +func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { - return nil, err + return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} msgs, err := i.rdb.ListPending(qname, pgn) if err != nil { - return nil, err + // TODO: Handle ErrQueueNotFound + return nil, fmt.Errorf("asynq: %v", err) } - var tasks []*PendingTask + now := time.Now() + var tasks []*TaskInfo for _, m := range msgs { - tasks = append(tasks, &PendingTask{ - Task: NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, - MaxRetry: m.Retry, - Retried: m.Retried, - LastError: m.ErrorMsg, + tasks = append(tasks, &TaskInfo{ + msg: m, + state: base.TaskStatePending, + nextProcessAt: now, }) } return tasks, err @@ -379,114 +378,100 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi // ListActiveTasks retrieves active tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { +func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { - return nil, err + return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} msgs, err := i.rdb.ListActive(qname, pgn) if err != nil { - return nil, err + // TODO: Handle QueueNotFound + return nil, fmt.Errorf("asynq: %v", err) } - var tasks []*ActiveTask + var tasks []*TaskInfo for _, m := range msgs { - tasks = append(tasks, &ActiveTask{ - Task: NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, - MaxRetry: m.Retry, - Retried: m.Retried, - LastError: m.ErrorMsg, + tasks = append(tasks, &TaskInfo{ + msg: m, + state: base.TaskStateActive, }) } return tasks, err } // ListScheduledTasks retrieves scheduled tasks from the specified queue. -// Tasks are sorted by NextProcessAt field in ascending order. +// Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { +func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { - return nil, err + return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListScheduled(qname, pgn) if err != nil { - return nil, err + // TODO: handle ErrQueueNotFound + return nil, fmt.Errorf("asynq: %v", err) } - var tasks []*ScheduledTask + var tasks []*TaskInfo for _, z := range zs { - processAt := time.Unix(z.Score, 0) - t := NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &ScheduledTask{ - Task: t, - ID: z.Message.ID.String(), - Queue: z.Message.Queue, - MaxRetry: z.Message.Retry, - Retried: z.Message.Retried, - LastError: z.Message.ErrorMsg, - NextProcessAt: processAt, - score: z.Score, + tasks = append(tasks, &TaskInfo{ + msg: z.Message, + state: base.TaskStateScheduled, + nextProcessAt: time.Unix(z.Score, 0), }) } return tasks, nil } // ListRetryTasks retrieves retry tasks from the specified queue. -// Tasks are sorted by NextProcessAt field in ascending order. +// Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { +func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { - return nil, err + return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListRetry(qname, pgn) if err != nil { - return nil, err + // TODO: handle ErrQueueNotFound + return nil, fmt.Errorf("asynq: %v", err) } - var tasks []*RetryTask + var tasks []*TaskInfo for _, z := range zs { - processAt := time.Unix(z.Score, 0) - t := NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &RetryTask{ - Task: t, - ID: z.Message.ID.String(), - Queue: z.Message.Queue, - NextProcessAt: processAt, - MaxRetry: z.Message.Retry, - Retried: z.Message.Retried, - // TODO: LastFailedAt: z.Message.LastFailedAt - LastError: z.Message.ErrorMsg, - score: z.Score, + tasks = append(tasks, &TaskInfo{ + msg: z.Message, + state: base.TaskStateRetry, + nextProcessAt: time.Unix(z.Score, 0), }) } return tasks, nil } // ListArchivedTasks retrieves archived tasks from the specified queue. -// Tasks are sorted by LastFailedAt field in descending order. +// Tasks are sorted by LastFailedAt in descending order. // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { if err := base.ValidateQueueName(qname); err != nil { - return nil, err + return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListArchived(qname, pgn) if err != nil { - return nil, err + // TODO: handle ErrQueueNotFound + return nil, fmt.Errorf("asynq: %v", err) } var tasks []*ArchivedTask for _, z := range zs { failedAt := time.Unix(z.Score, 0) t := NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &ArchivedTask{ + // TODO: How to handle last failed at Task: t, ID: z.Message.ID.String(), Queue: z.Message.Queue, diff --git a/inspector_test.go b/inspector_test.go index c9a3fdb..894720c 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -422,14 +422,11 @@ func TestInspectorHistory(t *testing.T) { } } -func createPendingTask(msg *base.TaskMessage) *PendingTask { - return &PendingTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, +func createPendingTask(msg *base.TaskMessage) *TaskInfo { + return &TaskInfo{ + msg: msg, + state: base.TaskStatePending, + nextProcessAt: time.Now(), } } @@ -447,7 +444,7 @@ func TestInspectorListPendingTasks(t *testing.T) { desc string pending map[string][]*base.TaskMessage qname string - want []*PendingTask + want []*TaskInfo }{ { desc: "with default queue", @@ -455,7 +452,7 @@ func TestInspectorListPendingTasks(t *testing.T) { "default": {m1, m2}, }, qname: "default", - want: []*PendingTask{ + want: []*TaskInfo{ createPendingTask(m1), createPendingTask(m2), }, @@ -468,7 +465,7 @@ func TestInspectorListPendingTasks(t *testing.T) { "low": {m4}, }, qname: "critical", - want: []*PendingTask{ + want: []*TaskInfo{ createPendingTask(m3), }, }, @@ -478,7 +475,7 @@ func TestInspectorListPendingTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*PendingTask(nil), + want: []*TaskInfo(nil), }, } @@ -494,8 +491,11 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + cmpOpts := []cmp.Option{ + cmpopts.EquateApproxTime(2 * time.Second), + cmp.AllowUnexported(TaskInfo{}), + } + if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } @@ -512,22 +512,11 @@ func TestInspectorListActiveTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) - createActiveTask := func(msg *base.TaskMessage) *ActiveTask { - return &ActiveTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, - } - } - tests := []struct { desc string active map[string][]*base.TaskMessage qname string - want []*ActiveTask + want []*TaskInfo }{ { desc: "with a few active tasks", @@ -536,9 +525,9 @@ func TestInspectorListActiveTasks(t *testing.T) { "custom": {m3, m4}, }, qname: "default", - want: []*ActiveTask{ - createActiveTask(m1), - createActiveTask(m2), + want: []*TaskInfo{ + {msg: m1, state: base.TaskStateActive, nextProcessAt: time.Time{}}, + {msg: m2, state: base.TaskStateActive, nextProcessAt: time.Time{}}, }, }, } @@ -552,25 +541,18 @@ func TestInspectorListActiveTasks(t *testing.T) { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } } -func createScheduledTask(z base.Z) *ScheduledTask { - msg := z.Message - return &ScheduledTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, - NextProcessAt: time.Unix(z.Score, 0), - score: z.Score, +func createScheduledTask(z base.Z) *TaskInfo { + return &TaskInfo{ + msg: z.Message, + state: base.TaskStateScheduled, + nextProcessAt: time.Unix(z.Score, 0), } } @@ -593,7 +575,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { desc string scheduled map[string][]base.Z qname string - want []*ScheduledTask + want []*TaskInfo }{ { desc: "with a few scheduled tasks", @@ -603,7 +585,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { }, qname: "default", // Should be sorted by NextProcessAt. - want: []*ScheduledTask{ + want: []*TaskInfo{ createScheduledTask(z3), createScheduledTask(z1), createScheduledTask(z2), @@ -615,7 +597,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*ScheduledTask(nil), + want: []*TaskInfo(nil), }, } @@ -628,25 +610,18 @@ func TestInspectorListScheduledTasks(t *testing.T) { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ScheduledTask{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } } -func createRetryTask(z base.Z) *RetryTask { - msg := z.Message - return &RetryTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - NextProcessAt: time.Unix(z.Score, 0), - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, - score: z.Score, +func createRetryTask(z base.Z) *TaskInfo { + return &TaskInfo{ + msg: z.Message, + state: base.TaskStateRetry, + nextProcessAt: time.Unix(z.Score, 0), } } @@ -669,7 +644,7 @@ func TestInspectorListRetryTasks(t *testing.T) { desc string retry map[string][]base.Z qname string - want []*RetryTask + want []*TaskInfo }{ { desc: "with a few retry tasks", @@ -679,7 +654,7 @@ func TestInspectorListRetryTasks(t *testing.T) { }, qname: "default", // Should be sorted by NextProcessAt. - want: []*RetryTask{ + want: []*TaskInfo{ createRetryTask(z3), createRetryTask(z1), createRetryTask(z2), @@ -691,7 +666,7 @@ func TestInspectorListRetryTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*RetryTask(nil), + want: []*TaskInfo(nil), }, // TODO(hibiken): ErrQueueNotFound when queue doesn't exist } @@ -705,8 +680,7 @@ func TestInspectorListRetryTasks(t *testing.T) { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Task{}, RetryTask{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } @@ -805,12 +779,12 @@ func TestInspectorListPagination(t *testing.T) { tests := []struct { page int pageSize int - want []*PendingTask + want []*TaskInfo }{ { page: 1, pageSize: 5, - want: []*PendingTask{ + want: []*TaskInfo{ createPendingTask(msgs[0]), createPendingTask(msgs[1]), createPendingTask(msgs[2]), @@ -821,7 +795,7 @@ func TestInspectorListPagination(t *testing.T) { { page: 3, pageSize: 10, - want: []*PendingTask{ + want: []*TaskInfo{ createPendingTask(msgs[20]), createPendingTask(msgs[21]), createPendingTask(msgs[22]), @@ -842,8 +816,11 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + cmpOpts := []cmp.Option{ + cmpopts.EquateApproxTime(2 * time.Second), + cmp.AllowUnexported(TaskInfo{}), + } + if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" { t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) } @@ -1841,7 +1818,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "default", - id: createPendingTask(m2).ID, + id: createPendingTask(m2).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -1853,7 +1830,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "custom", - id: createPendingTask(m3).ID, + id: createPendingTask(m3).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, @@ -1906,7 +1883,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createScheduledTask(z2).ID, + id: createScheduledTask(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -1956,7 +1933,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createRetryTask(z2).ID, + id: createRetryTask(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2127,7 +2104,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createScheduledTask(z2).ID, + id: createScheduledTask(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2197,7 +2174,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID, + id: createRetryTask(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2435,7 +2412,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createPendingTask(m1).ID, + id: createPendingTask(m1).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2, m3}, @@ -2457,7 +2434,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createPendingTask(m2).ID, + id: createPendingTask(m2).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2530,7 +2507,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createScheduledTask(z2).ID, + id: createScheduledTask(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2605,7 +2582,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID, + id: createRetryTask(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2680,7 +2657,7 @@ func TestInspectorArchiveTaskError(t *testing.T) { "custom": {}, }, qname: "nonexistent", - id: createRetryTask(z2).ID, + id: createRetryTask(z2).ID(), wantErr: ErrQueueNotFound, wantRetry: map[string][]base.Z{ "default": {z1},