From b29fe584344181091d96f690ed3716942f24a722 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 10 Mar 2022 11:06:48 -0800 Subject: [PATCH] Implement RDB.ListGroups --- internal/asynqtest/asynqtest.go | 4 ++- internal/rdb/rdb.go | 8 +++-- internal/rdb/rdb_test.go | 61 +++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 6c7a35c..1bd9c3c 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -248,7 +248,9 @@ func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z // SeedGroup initializes the group with the given entries. func SeedGroup(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname, gname string) { tb.Helper() - r.SAdd(context.Background(), base.AllQueues, qname) + ctx := context.Background() + r.SAdd(ctx, base.AllQueues, qname) + r.SAdd(ctx, base.AllGroups(qname), gname) seedRedisZSet(tb, r, base.GroupKey(qname, gname), entries, base.TaskStateAggregating) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index cb282c5..d203e37 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -985,8 +985,12 @@ func (r *RDB) forwardAll(qname string) (err error) { // ListGroups returns a list of all known groups in the given queue. func (r *RDB) ListGroups(qname string) ([]string, error) { - // TODO: Implement this with TDD - return nil, nil + var op errors.Op = "RDB.ListGroups" + groups, err := r.client.SMembers(context.Background(), base.AllGroups(qname)).Result() + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: err}) + } + return groups, nil } // TODO: Add comment describing what the script does. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 903f04f..38a1fc6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3331,3 +3331,64 @@ func TestDeleteAggregationSet(t *testing.T) { } } } + +func TestListGroups(t *testing.T) { + r := setup(t) + defer r.Close() + + now := time.Now() + m1 := h.NewTaskMessageBuilder().SetQueue("default").SetGroup("foo").Build() + m2 := h.NewTaskMessageBuilder().SetQueue("default").SetGroup("bar").Build() + m3 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("baz").Build() + m4 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("qux").Build() + + tests := []struct { + groups map[string]map[string][]base.Z + qname string + want []string + }{ + { + groups: map[string]map[string][]base.Z{ + "default": { + "foo": {{Message: m1, Score: now.Add(-10 * time.Second).Unix()}}, + "bar": {{Message: m2, Score: now.Add(-10 * time.Second).Unix()}}, + }, + "custom": { + "baz": {{Message: m3, Score: now.Add(-10 * time.Second).Unix()}}, + "qux": {{Message: m4, Score: now.Add(-10 * time.Second).Unix()}}, + }, + }, + qname: "default", + want: []string{"foo", "bar"}, + }, + { + groups: map[string]map[string][]base.Z{ + "default": { + "foo": {{Message: m1, Score: now.Add(-10 * time.Second).Unix()}}, + "bar": {{Message: m2, Score: now.Add(-10 * time.Second).Unix()}}, + }, + "custom": { + "baz": {{Message: m3, Score: now.Add(-10 * time.Second).Unix()}}, + "qux": {{Message: m4, Score: now.Add(-10 * time.Second).Unix()}}, + }, + }, + qname: "custom", + want: []string{"baz", "qux"}, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllGroups(t, r.client, tc.groups) + + got, err := r.ListGroups(tc.qname) + if err != nil { + t.Errorf("ListGroups returned error: %v", err) + continue + } + + if diff := cmp.Diff(tc.want, got, h.SortStringSliceOpt); diff != "" { + t.Errorf("ListGroups=%v, want=%v; (-want,+got)\n%s", got, tc.want, diff) + } + } +}