mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add RDB.GroupStats for inspecting groups
This commit is contained in:
parent
45ed560708
commit
0149396bae
@ -550,6 +550,56 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type GroupStat struct {
|
||||||
|
// Name of the group.
|
||||||
|
Group string
|
||||||
|
|
||||||
|
// Size of the group.
|
||||||
|
Size int
|
||||||
|
}
|
||||||
|
|
||||||
|
// KEYS[1] -> asynq:{<qname>}:groups
|
||||||
|
// -------
|
||||||
|
// ARGV[1] -> group key prefix
|
||||||
|
//
|
||||||
|
// Output:
|
||||||
|
// list of group name and size (e.g. group1 size1 group2 size2 ...)
|
||||||
|
//
|
||||||
|
// Time Complexity:
|
||||||
|
// O(N) where N being the number of groups in the given queue.
|
||||||
|
var groupStatsCmd = redis.NewScript(`
|
||||||
|
local res = {}
|
||||||
|
local group_names = redis.call("SMEMBERS", KEYS[1])
|
||||||
|
for _, gname in ipairs(group_names) do
|
||||||
|
local size = redis.call("ZCARD", ARGV[1] .. gname)
|
||||||
|
table.insert(res, gname)
|
||||||
|
table.insert(res, size)
|
||||||
|
end
|
||||||
|
return res
|
||||||
|
`)
|
||||||
|
|
||||||
|
func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) {
|
||||||
|
var op errors.Op = "RDB.GroupStats"
|
||||||
|
keys := []string{base.AllGroups(qname)}
|
||||||
|
argv := []interface{}{base.GroupKeyPrefix(qname)}
|
||||||
|
res, err := groupStatsCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.E(op, errors.Unknown, err)
|
||||||
|
}
|
||||||
|
data, err := cast.ToSliceE(res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.E(op, errors.Internal, "cast error: unexpected return value from Lua script")
|
||||||
|
}
|
||||||
|
var stats []*GroupStat
|
||||||
|
for i := 0; i < len(data); i += 2 {
|
||||||
|
stats = append(stats, &GroupStat{
|
||||||
|
Group: data[i].(string),
|
||||||
|
Size: int(data[i+1].(int64)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Pagination specifies the page size and page number
|
// Pagination specifies the page size and page number
|
||||||
// for the list operation.
|
// for the list operation.
|
||||||
type Pagination struct {
|
type Pagination struct {
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -419,6 +420,102 @@ func TestRedisInfo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGroupStats(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
m1 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m2 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m3 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m4 := h.NewTaskMessageBuilder().SetGroup("group2").Build()
|
||||||
|
m5 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()
|
||||||
|
m6 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
fixtures := struct {
|
||||||
|
tasks []*taskData
|
||||||
|
allGroups map[string][]string
|
||||||
|
groups map[string][]*redis.Z
|
||||||
|
}{
|
||||||
|
tasks: []*taskData{
|
||||||
|
{msg: m1, state: base.TaskStateAggregating},
|
||||||
|
{msg: m2, state: base.TaskStateAggregating},
|
||||||
|
{msg: m3, state: base.TaskStateAggregating},
|
||||||
|
{msg: m4, state: base.TaskStateAggregating},
|
||||||
|
{msg: m5, state: base.TaskStateAggregating},
|
||||||
|
},
|
||||||
|
allGroups: map[string][]string{
|
||||||
|
base.AllGroups("default"): {"group1", "group2"},
|
||||||
|
base.AllGroups("custom"): {"group1"},
|
||||||
|
},
|
||||||
|
groups: map[string][]*redis.Z{
|
||||||
|
base.GroupKey("default", "group1"): {
|
||||||
|
{Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||||
|
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
{Member: m3.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
base.GroupKey("default", "group2"): {
|
||||||
|
{Member: m4.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
base.GroupKey("custom", "group1"): {
|
||||||
|
{Member: m5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||||
|
{Member: m6.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
qname string
|
||||||
|
want []*GroupStat
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default queue groups",
|
||||||
|
qname: "default",
|
||||||
|
want: []*GroupStat{
|
||||||
|
{Group: "group1", Size: 3},
|
||||||
|
{Group: "group2", Size: 1},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "custom queue groups",
|
||||||
|
qname: "custom",
|
||||||
|
want: []*GroupStat{
|
||||||
|
{Group: "group1", Size: 2},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var sortGroupStatsOpt = cmp.Transformer(
|
||||||
|
"SortGroupStats",
|
||||||
|
func(in []*GroupStat) []*GroupStat {
|
||||||
|
out := append([]*GroupStat(nil), in...)
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
return out[i].Group < out[j].Group
|
||||||
|
})
|
||||||
|
return out
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
SeedTasks(t, r.client, fixtures.tasks)
|
||||||
|
SeedSets(t, r.client, fixtures.allGroups)
|
||||||
|
SeedZSets(t, r.client, fixtures.groups)
|
||||||
|
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
got, err := r.GroupStats(tc.qname)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GroupStats returned error: %v", err)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(tc.want, got, sortGroupStatsOpt); diff != "" {
|
||||||
|
t.Errorf("GroupStats = %v, want %v; (-want,+got)\n%s", got, tc.want, diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetTaskInfo(t *testing.T) {
|
func TestGetTaskInfo(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
Loading…
Reference in New Issue
Block a user