diff --git a/inspector.go b/inspector.go index 8a364a2..61a6a48 100644 --- a/inspector.go +++ b/inspector.go @@ -43,7 +43,32 @@ func (i *Inspector) Queues() ([]string, error) { return i.rdb.AllQueues() } -// QueueInfo represents a state of queues at a certain time. +// Groups returns a list of all groups within the given queue. +func (i *Inspector) Groups(qname string) ([]*GroupInfo, error) { + stats, err := i.rdb.GroupStats(qname) + if err != nil { + return nil, err + } + var res []*GroupInfo + for _, s := range stats { + res = append(res, &GroupInfo{ + Group: s.Group, + Size: s.Size, + }) + } + return res, nil +} + +// GroupInfo represents a state of a group at a cerntain time. +type GroupInfo struct { + // Name of the group. + Group string + + // Size is the total number of tasks in the group. + Size int +} + +// QueueInfo represents a state of a queue at a certain time. type QueueInfo struct { // Name of the queue. Queue string @@ -339,6 +364,12 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn return tasks, nil } +// TODO: comment +func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) { + // TODO: Implement this with TDD + return nil, nil +} + // ListScheduledTasks retrieves scheduled tasks from the specified queue. // Tasks are sorted by NextProcessAt in ascending order. // @@ -505,6 +536,12 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { return int(n), err } +// TODO: comment +func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) { + // TODO: implement this + return 0, nil +} + // DeleteTask deletes a task with the given id from the given queue. // The task needs to be in pending, scheduled, retry, or archived state, // otherwise DeleteTask will return an error. @@ -612,6 +649,12 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { return int(n), err } +// TODO: comment +func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) { + // TODO: implement this + return 0, nil +} + // ArchiveTask archives a task with the given id in the given queue. // The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask // will return an error. diff --git a/inspector_test.go b/inspector_test.go index ff8add0..25ce538 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v8" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -3323,3 +3324,99 @@ func TestParseOption(t *testing.T) { }) } } + +func TestInspectorGroups(t *testing.T) { + r := setup(t) + defer r.Close() + inspector := NewInspector(getRedisConnOpt(t)) + + m1 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m2 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m3 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m4 := h.NewTaskMessageBuilder().SetGroup("group2").Build() + m5 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build() + m6 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build() + + now := time.Now() + + fixtures := struct { + tasks []*h.TaskSeedData + allGroups map[string][]string + groups map[string][]*redis.Z + }{ + tasks: []*h.TaskSeedData{ + {Msg: m1, State: base.TaskStateAggregating}, + {Msg: m2, State: base.TaskStateAggregating}, + {Msg: m3, State: base.TaskStateAggregating}, + {Msg: m4, State: base.TaskStateAggregating}, + {Msg: m5, State: base.TaskStateAggregating}, + }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"group1", "group2"}, + base.AllGroups("custom"): {"group1"}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m3.ID, Score: float64(now.Add(-30 * time.Second).Unix())}, + }, + base.GroupKey("default", "group2"): { + {Member: m4.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, + {Member: m6.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + } + + tests := []struct { + desc string + qname string + want []*GroupInfo + }{ + { + desc: "default queue groups", + qname: "default", + want: []*GroupInfo{ + {Group: "group1", Size: 3}, + {Group: "group2", Size: 1}, + }, + }, + { + desc: "custom queue groups", + qname: "custom", + want: []*GroupInfo{ + {Group: "group1", Size: 2}, + }, + }, + } + + var sortGroupInfosOpt = cmp.Transformer( + "SortGroupInfos", + func(in []*GroupInfo) []*GroupInfo { + out := append([]*GroupInfo(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].Group < out[j].Group + }) + return out + }) + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedTasks(t, r, fixtures.tasks) + h.SeedRedisSets(t, r, fixtures.allGroups) + h.SeedRedisZSets(t, r, fixtures.groups) + + t.Run(tc.desc, func(t *testing.T) { + got, err := inspector.Groups(tc.qname) + if err != nil { + t.Fatalf("Groups returned error: %v", err) + } + if diff := cmp.Diff(tc.want, got, sortGroupInfosOpt); diff != "" { + t.Errorf("Groups = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) + } + }) + } +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0da4915..ae3c608 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -513,7 +513,6 @@ func TestGroupStats(t *testing.T) { } }) } - } func TestGetTaskInfo(t *testing.T) {