2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Implement RDB.ListGroups

This commit is contained in:
Ken Hibino 2022-03-10 11:06:48 -08:00
parent 7849b1114c
commit b29fe58434
3 changed files with 70 additions and 3 deletions

View File

@ -248,7 +248,9 @@ func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z
// SeedGroup initializes the group with the given entries.
func SeedGroup(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname, gname string) {
tb.Helper()
r.SAdd(context.Background(), base.AllQueues, qname)
ctx := context.Background()
r.SAdd(ctx, base.AllQueues, qname)
r.SAdd(ctx, base.AllGroups(qname), gname)
seedRedisZSet(tb, r, base.GroupKey(qname, gname), entries, base.TaskStateAggregating)
}

View File

@ -985,8 +985,12 @@ func (r *RDB) forwardAll(qname string) (err error) {
// ListGroups returns a list of all known groups in the given queue.
func (r *RDB) ListGroups(qname string) ([]string, error) {
// TODO: Implement this with TDD
return nil, nil
var op errors.Op = "RDB.ListGroups"
groups, err := r.client.SMembers(context.Background(), base.AllGroups(qname)).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: err})
}
return groups, nil
}
// TODO: Add comment describing what the script does.

View File

@ -3331,3 +3331,64 @@ func TestDeleteAggregationSet(t *testing.T) {
}
}
}
func TestListGroups(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
m1 := h.NewTaskMessageBuilder().SetQueue("default").SetGroup("foo").Build()
m2 := h.NewTaskMessageBuilder().SetQueue("default").SetGroup("bar").Build()
m3 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("baz").Build()
m4 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("qux").Build()
tests := []struct {
groups map[string]map[string][]base.Z
qname string
want []string
}{
{
groups: map[string]map[string][]base.Z{
"default": {
"foo": {{Message: m1, Score: now.Add(-10 * time.Second).Unix()}},
"bar": {{Message: m2, Score: now.Add(-10 * time.Second).Unix()}},
},
"custom": {
"baz": {{Message: m3, Score: now.Add(-10 * time.Second).Unix()}},
"qux": {{Message: m4, Score: now.Add(-10 * time.Second).Unix()}},
},
},
qname: "default",
want: []string{"foo", "bar"},
},
{
groups: map[string]map[string][]base.Z{
"default": {
"foo": {{Message: m1, Score: now.Add(-10 * time.Second).Unix()}},
"bar": {{Message: m2, Score: now.Add(-10 * time.Second).Unix()}},
},
"custom": {
"baz": {{Message: m3, Score: now.Add(-10 * time.Second).Unix()}},
"qux": {{Message: m4, Score: now.Add(-10 * time.Second).Unix()}},
},
},
qname: "custom",
want: []string{"baz", "qux"},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllGroups(t, r.client, tc.groups)
got, err := r.ListGroups(tc.qname)
if err != nil {
t.Errorf("ListGroups returned error: %v", err)
continue
}
if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("ListGroups=%v, want=%v; (-want,+got)\n%s", got, tc.want, diff)
}
}
}