From c02409c97444d5db9cb22ef4a6ebb467f924d057 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 2 Apr 2021 07:01:06 -0700 Subject: [PATCH] Update Inspector to return TaskInfo from list methods --- inspeq/inspector.go | 163 +++++--------------------- inspeq/inspector_test.go | 242 ++++++++++++++++++--------------------- 2 files changed, 142 insertions(+), 263 deletions(-) diff --git a/inspeq/inspector.go b/inspeq/inspector.go index dc9b40a..2105299 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -255,9 +255,9 @@ func (t *TaskInfo) LastFailedAt() time.Time { return time.Unix(t.info.LastFailedAt, 0) } -// LastErr returns the error message from the last failure. +// LastError returns the error message from the last failure. // Empty string is returned if the task has not failed. -func (t *TaskInfo) LastErr() string { +func (t *TaskInfo) LastError() string { return t.info.ErrorMsg } @@ -280,17 +280,8 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { return &TaskInfo{info}, nil } -// PendingTask is a task in a queue and is ready to be processed. -type PendingTask struct { - *asynq.Task - ID string - Queue string - MaxRetry int - Retried int - LastError string -} - // ActiveTask is a task that's currently being processed. +// TODO: remove this type type ActiveTask struct { *asynq.Task ID string @@ -300,49 +291,6 @@ type ActiveTask struct { LastError string } -// ScheduledTask is a task scheduled to be processed in the future. -type ScheduledTask struct { - *asynq.Task - ID string - Queue string - MaxRetry int - Retried int - LastError string - NextProcessAt time.Time - - score int64 -} - -// RetryTask is a task scheduled to be retried in the future. -type RetryTask struct { - *asynq.Task - ID string - Queue string - NextProcessAt time.Time - MaxRetry int - Retried int - LastError string - // TODO: LastFailedAt time.Time - - score int64 -} - -// ArchivedTask is a task archived for debugging and inspection purposes, and -// it won't be retried automatically. -// A task can be archived when the task exhausts its retry counts or manually -// archived by a user via the CLI or Inspector. -type ArchivedTask struct { - *asynq.Task - ID string - Queue string - MaxRetry int - Retried int - LastFailedAt time.Time - LastError string - - score int64 -} - // ListOption specifies behavior of list operation. type ListOption interface{} @@ -407,26 +355,19 @@ func Page(n int) ListOption { // ListPendingTasks retrieves pending tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { +func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListPending(qname, pgn) + infos, err := i.rdb.ListPending(qname, pgn) if err != nil { return nil, err } - var tasks []*PendingTask - for _, m := range msgs { - tasks = append(tasks, &PendingTask{ - Task: asynq.NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, - MaxRetry: m.Retry, - Retried: m.Retried, - LastError: m.ErrorMsg, - }) + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, &TaskInfo{info: i}) } return tasks, err } @@ -434,124 +375,82 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi // ListActiveTasks retrieves active tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { +func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListActive(qname, pgn) + infos, err := i.rdb.ListActive(qname, pgn) if err != nil { return nil, err } - var tasks []*ActiveTask - for _, m := range msgs { - - tasks = append(tasks, &ActiveTask{ - Task: asynq.NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, - MaxRetry: m.Retry, - Retried: m.Retried, - LastError: m.ErrorMsg, - }) + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, &TaskInfo{info: i}) } return tasks, err } // ListScheduledTasks retrieves scheduled tasks from the specified queue. -// Tasks are sorted by NextProcessAt field in ascending order. +// Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { +func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListScheduled(qname, pgn) + infos, err := i.rdb.ListScheduled(qname, pgn) if err != nil { return nil, err } - var tasks []*ScheduledTask - for _, z := range zs { - processAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &ScheduledTask{ - Task: t, - ID: z.Message.ID.String(), - Queue: z.Message.Queue, - MaxRetry: z.Message.Retry, - Retried: z.Message.Retried, - LastError: z.Message.ErrorMsg, - NextProcessAt: processAt, - score: z.Score, - }) + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, &TaskInfo{info: i}) } return tasks, nil } // ListRetryTasks retrieves retry tasks from the specified queue. -// Tasks are sorted by NextProcessAt field in ascending order. +// Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { +func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListRetry(qname, pgn) + infos, err := i.rdb.ListRetry(qname, pgn) if err != nil { return nil, err } - var tasks []*RetryTask - for _, z := range zs { - processAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &RetryTask{ - Task: t, - ID: z.Message.ID.String(), - Queue: z.Message.Queue, - NextProcessAt: processAt, - MaxRetry: z.Message.Retry, - Retried: z.Message.Retried, - // TODO: LastFailedAt: z.Message.LastFailedAt - LastError: z.Message.ErrorMsg, - score: z.Score, - }) + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, &TaskInfo{info: i}) } return tasks, nil } // ListArchivedTasks retrieves archived tasks from the specified queue. -// Tasks are sorted by LastFailedAt field in descending order. +// Tasks are sorted by LastFailedAt in descending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { +func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListArchived(qname, pgn) + infos, err := i.rdb.ListArchived(qname, pgn) if err != nil { return nil, err } - var tasks []*ArchivedTask - for _, z := range zs { - failedAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &ArchivedTask{ - Task: t, - ID: z.Message.ID.String(), - Queue: z.Message.Queue, - MaxRetry: z.Message.Retry, - Retried: z.Message.Retried, - LastFailedAt: failedAt, - LastError: z.Message.ErrorMsg, - score: z.Score, - }) + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, &TaskInfo{info: i}) } return tasks, nil } diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index af9baf0..a94ac4d 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -229,8 +229,8 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) - m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") - m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) tests := []struct { pending map[string][]*base.TaskMessage @@ -285,8 +285,8 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) - m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") - m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) tests := []struct { pending map[string][]*base.TaskMessage @@ -502,15 +502,14 @@ func TestInspectorHistory(t *testing.T) { } } -func createPendingTask(msg *base.TaskMessage) *PendingTask { - return &PendingTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, +func createPendingTaskInfo(msg *base.TaskMessage) *TaskInfo { + info := &base.TaskInfo{ + TaskMessage: msg, + State: "pending", + NextProcessAt: time.Now().Unix(), + LastFailedAt: 0, } + return &TaskInfo{info} } func TestInspectorListPendingTasks(t *testing.T) { @@ -527,7 +526,7 @@ func TestInspectorListPendingTasks(t *testing.T) { desc string pending map[string][]*base.TaskMessage qname string - want []*PendingTask + want []*TaskInfo }{ { desc: "with default queue", @@ -535,9 +534,9 @@ func TestInspectorListPendingTasks(t *testing.T) { "default": {m1, m2}, }, qname: "default", - want: []*PendingTask{ - createPendingTask(m1), - createPendingTask(m2), + want: []*TaskInfo{ + createPendingTaskInfo(m1), + createPendingTaskInfo(m2), }, }, { @@ -548,8 +547,8 @@ func TestInspectorListPendingTasks(t *testing.T) { "low": {m4}, }, qname: "critical", - want: []*PendingTask{ - createPendingTask(m3), + want: []*TaskInfo{ + createPendingTaskInfo(m3), }, }, { @@ -558,7 +557,7 @@ func TestInspectorListPendingTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*PendingTask(nil), + want: []*TaskInfo(nil), }, } @@ -574,8 +573,7 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } @@ -592,22 +590,21 @@ func TestInspectorListActiveTasks(t *testing.T) { inspector := New(getRedisConnOpt(t)) - createActiveTask := func(msg *base.TaskMessage) *ActiveTask { - return &ActiveTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, + createActiveTaskInfo := func(msg *base.TaskMessage) *TaskInfo { + info := &base.TaskInfo{ + TaskMessage: msg, + State: "active", + NextProcessAt: 0, + LastFailedAt: 0, } + return &TaskInfo{info} } tests := []struct { desc string active map[string][]*base.TaskMessage qname string - want []*ActiveTask + want []*TaskInfo }{ { desc: "with a few active tasks", @@ -616,9 +613,9 @@ func TestInspectorListActiveTasks(t *testing.T) { "custom": {m3, m4}, }, qname: "default", - want: []*ActiveTask{ - createActiveTask(m1), - createActiveTask(m2), + want: []*TaskInfo{ + createActiveTaskInfo(m1), + createActiveTaskInfo(m2), }, }, } @@ -632,26 +629,21 @@ func TestInspectorListActiveTasks(t *testing.T) { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } } -func createScheduledTask(z base.Z) *ScheduledTask { - msg := z.Message - return &ScheduledTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, - NextProcessAt: time.Unix(z.Score, 0), - score: z.Score, +func createScheduledTaskInfo(z base.Z) *TaskInfo { + info := &base.TaskInfo{ + TaskMessage: z.Message, + State: "scheduled", + NextProcessAt: z.Score, + LastFailedAt: 0, } + return &TaskInfo{info} } func TestInspectorListScheduledTasks(t *testing.T) { @@ -673,7 +665,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { desc string scheduled map[string][]base.Z qname string - want []*ScheduledTask + want []*TaskInfo }{ { desc: "with a few scheduled tasks", @@ -683,10 +675,10 @@ func TestInspectorListScheduledTasks(t *testing.T) { }, qname: "default", // Should be sorted by NextProcessAt. - want: []*ScheduledTask{ - createScheduledTask(z3), - createScheduledTask(z1), - createScheduledTask(z2), + want: []*TaskInfo{ + createScheduledTaskInfo(z3), + createScheduledTaskInfo(z1), + createScheduledTaskInfo(z2), }, }, { @@ -695,7 +687,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*ScheduledTask(nil), + want: []*TaskInfo(nil), }, } @@ -708,26 +700,21 @@ func TestInspectorListScheduledTasks(t *testing.T) { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } } -func createRetryTask(z base.Z) *RetryTask { - msg := z.Message - return &RetryTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - NextProcessAt: time.Unix(z.Score, 0), - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastError: msg.ErrorMsg, - score: z.Score, +func createRetryTaskInfo(z base.Z) *TaskInfo { + info := &base.TaskInfo{ + TaskMessage: z.Message, + State: "retry", + NextProcessAt: z.Score, + LastFailedAt: time.Now().Unix(), } + return &TaskInfo{info} } func TestInspectorListRetryTasks(t *testing.T) { @@ -749,7 +736,7 @@ func TestInspectorListRetryTasks(t *testing.T) { desc string retry map[string][]base.Z qname string - want []*RetryTask + want []*TaskInfo }{ { desc: "with a few retry tasks", @@ -759,10 +746,10 @@ func TestInspectorListRetryTasks(t *testing.T) { }, qname: "default", // Should be sorted by NextProcessAt. - want: []*RetryTask{ - createRetryTask(z3), - createRetryTask(z1), - createRetryTask(z2), + want: []*TaskInfo{ + createRetryTaskInfo(z3), + createRetryTaskInfo(z1), + createRetryTaskInfo(z2), }, }, { @@ -771,7 +758,7 @@ func TestInspectorListRetryTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*RetryTask(nil), + want: []*TaskInfo(nil), }, // TODO(hibiken): ErrQueueNotFound when queue doesn't exist } @@ -785,26 +772,21 @@ func TestInspectorListRetryTasks(t *testing.T) { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { + t.Errorf("%s; ListRetryTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } } -func createArchivedTask(z base.Z) *ArchivedTask { - msg := z.Message - return &ArchivedTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastFailedAt: time.Unix(z.Score, 0), - LastError: msg.ErrorMsg, - score: z.Score, +func createArchivedTaskInfo(z base.Z) *TaskInfo { + info := &base.TaskInfo{ + TaskMessage: z.Message, + State: "archived", + NextProcessAt: 0, + LastFailedAt: z.Score, } + return &TaskInfo{info} } func TestInspectorListArchivedTasks(t *testing.T) { @@ -826,7 +808,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { desc string archived map[string][]base.Z qname string - want []*ArchivedTask + want []*TaskInfo }{ { desc: "with a few archived tasks", @@ -836,10 +818,10 @@ func TestInspectorListArchivedTasks(t *testing.T) { }, qname: "default", // Should be sorted by LastFailedAt. - want: []*ArchivedTask{ - createArchivedTask(z2), - createArchivedTask(z1), - createArchivedTask(z3), + want: []*TaskInfo{ + createArchivedTaskInfo(z2), + createArchivedTaskInfo(z1), + createArchivedTaskInfo(z3), }, }, { @@ -848,7 +830,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { "default": {}, }, qname: "default", - want: []*ArchivedTask(nil), + want: []*TaskInfo(nil), }, } @@ -861,9 +843,8 @@ func TestInspectorListArchivedTasks(t *testing.T) { t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { + t.Errorf("%s; ListArchivedTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } @@ -885,33 +866,33 @@ func TestInspectorListPagination(t *testing.T) { tests := []struct { page int pageSize int - want []*PendingTask + want []*TaskInfo }{ { page: 1, pageSize: 5, - want: []*PendingTask{ - createPendingTask(msgs[0]), - createPendingTask(msgs[1]), - createPendingTask(msgs[2]), - createPendingTask(msgs[3]), - createPendingTask(msgs[4]), + want: []*TaskInfo{ + createPendingTaskInfo(msgs[0]), + createPendingTaskInfo(msgs[1]), + createPendingTaskInfo(msgs[2]), + createPendingTaskInfo(msgs[3]), + createPendingTaskInfo(msgs[4]), }, }, { page: 3, pageSize: 10, - want: []*PendingTask{ - createPendingTask(msgs[20]), - createPendingTask(msgs[21]), - createPendingTask(msgs[22]), - createPendingTask(msgs[23]), - createPendingTask(msgs[24]), - createPendingTask(msgs[25]), - createPendingTask(msgs[26]), - createPendingTask(msgs[27]), - createPendingTask(msgs[28]), - createPendingTask(msgs[29]), + want: []*TaskInfo{ + createPendingTaskInfo(msgs[20]), + createPendingTaskInfo(msgs[21]), + createPendingTaskInfo(msgs[22]), + createPendingTaskInfo(msgs[23]), + createPendingTaskInfo(msgs[24]), + createPendingTaskInfo(msgs[25]), + createPendingTaskInfo(msgs[26]), + createPendingTaskInfo(msgs[27]), + createPendingTaskInfo(msgs[28]), + createPendingTaskInfo(msgs[29]), }, }, } @@ -922,9 +903,8 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { + t.Errorf("ListPendingTasks('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) } } @@ -1901,7 +1881,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1921,7 +1901,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "default", - id: createPendingTask(m2).ID, + id: createPendingTaskInfo(m2).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -1933,7 +1913,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "custom", - id: createPendingTask(m3).ID, + id: createPendingTaskInfo(m3).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, @@ -1961,7 +1941,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1986,7 +1966,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createScheduledTask(z2).ID, + id: createScheduledTaskInfo(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2011,7 +1991,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2036,7 +2016,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createRetryTask(z2).ID, + id: createRetryTaskInfo(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2061,7 +2041,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { +func TestInspectorDeleteTaskyDeletesArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2086,7 +2066,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createArchivedTask(z2).ID, + id: createArchivedTaskInfo(z2).ID(), wantArchived: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2142,7 +2122,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createScheduledTask(z2).ID, + id: createScheduledTaskInfo(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2212,7 +2192,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID, + id: createRetryTaskInfo(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2283,7 +2263,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) { "low": {}, }, qname: "critical", - id: createArchivedTask(z2).ID, + id: createArchivedTaskInfo(z2).ID(), wantArchived: map[string][]base.Z{ "default": {z1}, "critical": {}, @@ -2350,7 +2330,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createPendingTask(m1).ID, + id: createPendingTaskInfo(m1).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2, m3}, @@ -2372,7 +2352,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createPendingTask(m2).ID, + id: createPendingTaskInfo(m2).ID(), wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2446,7 +2426,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createScheduledTask(z2).ID, + id: createScheduledTaskInfo(z2).ID(), wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2521,7 +2501,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID, + id: createRetryTaskInfo(z2).ID(), wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3},