Add Inspector.ListAggregatingTasks

This commit is contained in:
Ken Hibino
2022-03-22 06:23:22 -07:00
parent 71bd8f0535
commit 7fb5b25944
2 changed files with 128 additions and 3 deletions

View File

@@ -1121,6 +1121,106 @@ func TestInspectorListCompletedTasks(t *testing.T) {
}
}
func TestInspectorListAggregatingTasks(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
m1 := h.NewTaskMessageBuilder().SetType("task1").SetQueue("default").SetGroup("group1").Build()
m2 := h.NewTaskMessageBuilder().SetType("task2").SetQueue("default").SetGroup("group1").Build()
m3 := h.NewTaskMessageBuilder().SetType("task3").SetQueue("default").SetGroup("group1").Build()
m4 := h.NewTaskMessageBuilder().SetType("task4").SetQueue("default").SetGroup("group2").Build()
m5 := h.NewTaskMessageBuilder().SetType("task5").SetQueue("custom").SetGroup("group1").Build()
inspector := NewInspector(getRedisConnOpt(t))
fxt := struct {
tasks []*h.TaskSeedData
allQueues []string
allGroups map[string][]string
groups map[string][]*redis.Z
}{
tasks: []*h.TaskSeedData{
{Msg: m1, State: base.TaskStateAggregating},
{Msg: m2, State: base.TaskStateAggregating},
{Msg: m3, State: base.TaskStateAggregating},
{Msg: m4, State: base.TaskStateAggregating},
{Msg: m5, State: base.TaskStateAggregating},
},
allQueues: []string{"default", "custom"},
allGroups: map[string][]string{
base.AllGroups("default"): {"group1", "group2"},
base.AllGroups("custom"): {"group1"},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "group1"): {
{Member: m1.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
{Member: m3.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
base.GroupKey("default", "group2"): {
{Member: m4.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
},
base.GroupKey("custom", "group1"): {
{Member: m5.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
},
},
}
tests := []struct {
desc string
qname string
gname string
want []*TaskInfo
}{
{
desc: "default queue group1",
qname: "default",
gname: "group1",
want: []*TaskInfo{
createAggregatingTaskInfo(m1),
createAggregatingTaskInfo(m2),
createAggregatingTaskInfo(m3),
},
},
{
desc: "custom queue group1",
qname: "custom",
gname: "group1",
want: []*TaskInfo{
createAggregatingTaskInfo(m5),
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedTasks(t, r, fxt.tasks)
h.SeedRedisSet(t, r, base.AllQueues, fxt.allQueues)
h.SeedRedisSets(t, r, fxt.allGroups)
h.SeedRedisZSets(t, r, fxt.groups)
t.Run(tc.desc, func(t *testing.T) {
got, err := inspector.ListAggregatingTasks(tc.qname, tc.gname)
if err != nil {
t.Fatalf("ListAggregatingTasks returned error: %v", err)
}
cmpOpts := []cmp.Option{
cmpopts.EquateApproxTime(2 * time.Second),
cmp.AllowUnexported(TaskInfo{}),
}
if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" {
t.Errorf("ListAggregatingTasks = %v, want = %v; (-want,+got)\n%s", got, tc.want, diff)
}
})
}
}
func createAggregatingTaskInfo(msg *base.TaskMessage) *TaskInfo {
return newTaskInfo(msg, base.TaskStateAggregating, time.Time{}, nil)
}
func TestInspectorListPagination(t *testing.T) {
// Create 100 tasks.
var msgs []*base.TaskMessage
@@ -1222,6 +1322,9 @@ func TestInspectorListTasksQueueNotFoundError(t *testing.T) {
if _, err := inspector.ListCompletedTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListCompletedTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
if _, err := inspector.ListAggregatingTasks(tc.qname, "mygroup"); !errors.Is(err, tc.wantErr) {
t.Errorf("ListAggregatingTasks(%q, \"mygroup\") returned error %v, want %v", tc.qname, err, tc.wantErr)
}
}
}