diff --git a/inspector.go b/inspector.go index 61a6a48..c48ec83 100644 --- a/inspector.go +++ b/inspector.go @@ -364,10 +364,32 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn return tasks, nil } -// TODO: comment +// ListAggregatingTasks retrieves scheduled tasks from the specified group. +// +// By default, it retrieves the first 30 tasks. func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) { - // TODO: Implement this with TDD - return nil, nil + if err := base.ValidateQueueName(qname); err != nil { + return nil, fmt.Errorf("asynq: %v", err) + } + opt := composeListOptions(opts...) + pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} + infos, err := i.rdb.ListAggregating(qname, gname, pgn) + switch { + case errors.IsQueueNotFound(err): + return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) + case err != nil: + return nil, fmt.Errorf("asynq: %v", err) + } + var tasks []*TaskInfo + for _, i := range infos { + tasks = append(tasks, newTaskInfo( + i.Message, + i.State, + i.NextProcessAt, + i.Result, + )) + } + return tasks, nil } // ListScheduledTasks retrieves scheduled tasks from the specified queue. diff --git a/inspector_test.go b/inspector_test.go index 25ce538..c406528 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -1121,6 +1121,106 @@ func TestInspectorListCompletedTasks(t *testing.T) { } } +func TestInspectorListAggregatingTasks(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + + m1 := h.NewTaskMessageBuilder().SetType("task1").SetQueue("default").SetGroup("group1").Build() + m2 := h.NewTaskMessageBuilder().SetType("task2").SetQueue("default").SetGroup("group1").Build() + m3 := h.NewTaskMessageBuilder().SetType("task3").SetQueue("default").SetGroup("group1").Build() + m4 := h.NewTaskMessageBuilder().SetType("task4").SetQueue("default").SetGroup("group2").Build() + m5 := h.NewTaskMessageBuilder().SetType("task5").SetQueue("custom").SetGroup("group1").Build() + + inspector := NewInspector(getRedisConnOpt(t)) + + fxt := struct { + tasks []*h.TaskSeedData + allQueues []string + allGroups map[string][]string + groups map[string][]*redis.Z + }{ + tasks: []*h.TaskSeedData{ + {Msg: m1, State: base.TaskStateAggregating}, + {Msg: m2, State: base.TaskStateAggregating}, + {Msg: m3, State: base.TaskStateAggregating}, + {Msg: m4, State: base.TaskStateAggregating}, + {Msg: m5, State: base.TaskStateAggregating}, + }, + allQueues: []string{"default", "custom"}, + allGroups: map[string][]string{ + base.AllGroups("default"): {"group1", "group2"}, + base.AllGroups("custom"): {"group1"}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-30 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m3.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, + }, + base.GroupKey("default", "group2"): { + {Member: m4.ID, Score: float64(now.Add(-30 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m5.ID, Score: float64(now.Add(-30 * time.Second).Unix())}, + }, + }, + } + + tests := []struct { + desc string + qname string + gname string + want []*TaskInfo + }{ + { + desc: "default queue group1", + qname: "default", + gname: "group1", + want: []*TaskInfo{ + createAggregatingTaskInfo(m1), + createAggregatingTaskInfo(m2), + createAggregatingTaskInfo(m3), + }, + }, + { + desc: "custom queue group1", + qname: "custom", + gname: "group1", + want: []*TaskInfo{ + createAggregatingTaskInfo(m5), + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedTasks(t, r, fxt.tasks) + h.SeedRedisSet(t, r, base.AllQueues, fxt.allQueues) + h.SeedRedisSets(t, r, fxt.allGroups) + h.SeedRedisZSets(t, r, fxt.groups) + + t.Run(tc.desc, func(t *testing.T) { + got, err := inspector.ListAggregatingTasks(tc.qname, tc.gname) + if err != nil { + t.Fatalf("ListAggregatingTasks returned error: %v", err) + } + + cmpOpts := []cmp.Option{ + cmpopts.EquateApproxTime(2 * time.Second), + cmp.AllowUnexported(TaskInfo{}), + } + if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" { + t.Errorf("ListAggregatingTasks = %v, want = %v; (-want,+got)\n%s", got, tc.want, diff) + } + }) + } +} + +func createAggregatingTaskInfo(msg *base.TaskMessage) *TaskInfo { + return newTaskInfo(msg, base.TaskStateAggregating, time.Time{}, nil) +} + func TestInspectorListPagination(t *testing.T) { // Create 100 tasks. var msgs []*base.TaskMessage @@ -1222,6 +1322,9 @@ func TestInspectorListTasksQueueNotFoundError(t *testing.T) { if _, err := inspector.ListCompletedTasks(tc.qname); !errors.Is(err, tc.wantErr) { t.Errorf("ListCompletedTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr) } + if _, err := inspector.ListAggregatingTasks(tc.qname, "mygroup"); !errors.Is(err, tc.wantErr) { + t.Errorf("ListAggregatingTasks(%q, \"mygroup\") returned error %v, want %v", tc.qname, err, tc.wantErr) + } } }