diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 44ae569..0056f86 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -550,6 +550,56 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { }, nil } +type GroupStat struct { + // Name of the group. + Group string + + // Size of the group. + Size int +} + +// KEYS[1] -> asynq:{}:groups +// ------- +// ARGV[1] -> group key prefix +// +// Output: +// list of group name and size (e.g. group1 size1 group2 size2 ...) +// +// Time Complexity: +// O(N) where N being the number of groups in the given queue. +var groupStatsCmd = redis.NewScript(` +local res = {} +local group_names = redis.call("SMEMBERS", KEYS[1]) +for _, gname in ipairs(group_names) do + local size = redis.call("ZCARD", ARGV[1] .. gname) + table.insert(res, gname) + table.insert(res, size) +end +return res +`) + +func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) { + var op errors.Op = "RDB.GroupStats" + keys := []string{base.AllGroups(qname)} + argv := []interface{}{base.GroupKeyPrefix(qname)} + res, err := groupStatsCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err != nil { + return nil, errors.E(op, errors.Unknown, err) + } + data, err := cast.ToSliceE(res) + if err != nil { + return nil, errors.E(op, errors.Internal, "cast error: unexpected return value from Lua script") + } + var stats []*GroupStat + for i := 0; i < len(data); i += 2 { + stats = append(stats, &GroupStat{ + Group: data[i].(string), + Size: int(data[i+1].(int64)), + }) + } + return stats, nil +} + // Pagination specifies the page size and page number // for the list operation. type Pagination struct { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 9fd08b4..3df09a5 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "testing" "time" @@ -419,6 +420,102 @@ func TestRedisInfo(t *testing.T) { } } +func TestGroupStats(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m2 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m3 := h.NewTaskMessageBuilder().SetGroup("group1").Build() + m4 := h.NewTaskMessageBuilder().SetGroup("group2").Build() + m5 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build() + m6 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build() + + now := time.Now() + + fixtures := struct { + tasks []*taskData + allGroups map[string][]string + groups map[string][]*redis.Z + }{ + tasks: []*taskData{ + {msg: m1, state: base.TaskStateAggregating}, + {msg: m2, state: base.TaskStateAggregating}, + {msg: m3, state: base.TaskStateAggregating}, + {msg: m4, state: base.TaskStateAggregating}, + {msg: m5, state: base.TaskStateAggregating}, + }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"group1", "group2"}, + base.AllGroups("custom"): {"group1"}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m3.ID, Score: float64(now.Add(-30 * time.Second).Unix())}, + }, + base.GroupKey("default", "group2"): { + {Member: m4.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, + {Member: m6.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + } + + tests := []struct { + desc string + qname string + want []*GroupStat + }{ + { + desc: "default queue groups", + qname: "default", + want: []*GroupStat{ + {Group: "group1", Size: 3}, + {Group: "group2", Size: 1}, + }, + }, + { + desc: "custom queue groups", + qname: "custom", + want: []*GroupStat{ + {Group: "group1", Size: 2}, + }, + }, + } + + var sortGroupStatsOpt = cmp.Transformer( + "SortGroupStats", + func(in []*GroupStat) []*GroupStat { + out := append([]*GroupStat(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].Group < out[j].Group + }) + return out + }) + + for _, tc := range tests { + h.FlushDB(t, r.client) + SeedTasks(t, r.client, fixtures.tasks) + SeedSets(t, r.client, fixtures.allGroups) + SeedZSets(t, r.client, fixtures.groups) + + t.Run(tc.desc, func(t *testing.T) { + got, err := r.GroupStats(tc.qname) + if err != nil { + t.Fatalf("GroupStats returned error: %v", err) + } + if diff := cmp.Diff(tc.want, got, sortGroupStatsOpt); diff != "" { + t.Errorf("GroupStats = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) + } + }) + } + +} + func TestGetTaskInfo(t *testing.T) { r := setup(t) defer r.Close()