2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 22:06:12 +08:00

Add initial implementation of aggregator

This commit is contained in:
Ken Hibino
2022-03-08 06:38:35 -08:00
parent 68f0be934e
commit af9ac9aa71
4 changed files with 216 additions and 9 deletions

View File

@@ -421,6 +421,35 @@ func TestGroupKey(t *testing.T) {
}
}
func TestAggregationSetKey(t *testing.T) {
tests := []struct {
qname string
gname string
setID string
want string
}{
{
qname: "default",
gname: "mygroup",
setID: "12345",
want: "asynq:{default}:g:mygroup:12345",
},
{
qname: "custom",
gname: "foo",
setID: "98765",
want: "asynq:{custom}:g:foo:98765",
},
}
for _, tc := range tests {
got := AggregationSetKey(tc.qname, tc.gname, tc.setID)
if got != tc.want {
t.Errorf("AggregationSetKey(%q, %q, %q) = %q, want %q", tc.qname, tc.gname, tc.setID, got, tc.want)
}
}
}
func TestAllGroups(t *testing.T) {
tests := []struct {
qname string
@@ -444,25 +473,25 @@ func TestAllGroups(t *testing.T) {
}
}
func TestAllStagedGroups(t *testing.T) {
func TestAllAggregationSets(t *testing.T) {
tests := []struct {
qname string
want string
}{
{
qname: "default",
want: "asynq:{default}:staged_groups",
want: "asynq:{default}:aggregation_sets",
},
{
qname: "custom",
want: "asynq:{custom}:staged_groups",
want: "asynq:{custom}:aggregation_sets",
},
}
for _, tc := range tests {
got := AllStagedGroups(tc.qname)
got := AllAggregationSets(tc.qname)
if got != tc.want {
t.Errorf("AllStagedGroups(%q) = %q, want %q", tc.qname, got, tc.want)
t.Errorf("AllAggregationSets(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}