mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Add Inspector.Groups method
This commit is contained in:
parent
e939b5d166
commit
4c8432e0ce
45
inspector.go
45
inspector.go
@ -43,7 +43,32 @@ func (i *Inspector) Queues() ([]string, error) {
|
|||||||
return i.rdb.AllQueues()
|
return i.rdb.AllQueues()
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueueInfo represents a state of queues at a certain time.
|
// Groups returns a list of all groups within the given queue.
|
||||||
|
func (i *Inspector) Groups(qname string) ([]*GroupInfo, error) {
|
||||||
|
stats, err := i.rdb.GroupStats(qname)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var res []*GroupInfo
|
||||||
|
for _, s := range stats {
|
||||||
|
res = append(res, &GroupInfo{
|
||||||
|
Group: s.Group,
|
||||||
|
Size: s.Size,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GroupInfo represents a state of a group at a cerntain time.
|
||||||
|
type GroupInfo struct {
|
||||||
|
// Name of the group.
|
||||||
|
Group string
|
||||||
|
|
||||||
|
// Size is the total number of tasks in the group.
|
||||||
|
Size int
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueInfo represents a state of a queue at a certain time.
|
||||||
type QueueInfo struct {
|
type QueueInfo struct {
|
||||||
// Name of the queue.
|
// Name of the queue.
|
||||||
Queue string
|
Queue string
|
||||||
@ -339,6 +364,12 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
|
|||||||
return tasks, nil
|
return tasks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: comment
|
||||||
|
func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) {
|
||||||
|
// TODO: Implement this with TDD
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListScheduledTasks retrieves scheduled tasks from the specified queue.
|
// ListScheduledTasks retrieves scheduled tasks from the specified queue.
|
||||||
// Tasks are sorted by NextProcessAt in ascending order.
|
// Tasks are sorted by NextProcessAt in ascending order.
|
||||||
//
|
//
|
||||||
@ -505,6 +536,12 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) {
|
|||||||
return int(n), err
|
return int(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: comment
|
||||||
|
func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) {
|
||||||
|
// TODO: implement this
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteTask deletes a task with the given id from the given queue.
|
// DeleteTask deletes a task with the given id from the given queue.
|
||||||
// The task needs to be in pending, scheduled, retry, or archived state,
|
// The task needs to be in pending, scheduled, retry, or archived state,
|
||||||
// otherwise DeleteTask will return an error.
|
// otherwise DeleteTask will return an error.
|
||||||
@ -612,6 +649,12 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
|
|||||||
return int(n), err
|
return int(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: comment
|
||||||
|
func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) {
|
||||||
|
// TODO: implement this
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ArchiveTask archives a task with the given id in the given queue.
|
// ArchiveTask archives a task with the given id in the given queue.
|
||||||
// The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask
|
// The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask
|
||||||
// will return an error.
|
// will return an error.
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@ -3323,3 +3324,99 @@ func TestParseOption(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInspectorGroups(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
inspector := NewInspector(getRedisConnOpt(t))
|
||||||
|
|
||||||
|
m1 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m2 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m3 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
|
||||||
|
m4 := h.NewTaskMessageBuilder().SetGroup("group2").Build()
|
||||||
|
m5 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()
|
||||||
|
m6 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
fixtures := struct {
|
||||||
|
tasks []*h.TaskSeedData
|
||||||
|
allGroups map[string][]string
|
||||||
|
groups map[string][]*redis.Z
|
||||||
|
}{
|
||||||
|
tasks: []*h.TaskSeedData{
|
||||||
|
{Msg: m1, State: base.TaskStateAggregating},
|
||||||
|
{Msg: m2, State: base.TaskStateAggregating},
|
||||||
|
{Msg: m3, State: base.TaskStateAggregating},
|
||||||
|
{Msg: m4, State: base.TaskStateAggregating},
|
||||||
|
{Msg: m5, State: base.TaskStateAggregating},
|
||||||
|
},
|
||||||
|
allGroups: map[string][]string{
|
||||||
|
base.AllGroups("default"): {"group1", "group2"},
|
||||||
|
base.AllGroups("custom"): {"group1"},
|
||||||
|
},
|
||||||
|
groups: map[string][]*redis.Z{
|
||||||
|
base.GroupKey("default", "group1"): {
|
||||||
|
{Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||||
|
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
{Member: m3.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
base.GroupKey("default", "group2"): {
|
||||||
|
{Member: m4.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
base.GroupKey("custom", "group1"): {
|
||||||
|
{Member: m5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||||
|
{Member: m6.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
qname string
|
||||||
|
want []*GroupInfo
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default queue groups",
|
||||||
|
qname: "default",
|
||||||
|
want: []*GroupInfo{
|
||||||
|
{Group: "group1", Size: 3},
|
||||||
|
{Group: "group2", Size: 1},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "custom queue groups",
|
||||||
|
qname: "custom",
|
||||||
|
want: []*GroupInfo{
|
||||||
|
{Group: "group1", Size: 2},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
var sortGroupInfosOpt = cmp.Transformer(
|
||||||
|
"SortGroupInfos",
|
||||||
|
func(in []*GroupInfo) []*GroupInfo {
|
||||||
|
out := append([]*GroupInfo(nil), in...)
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
return out[i].Group < out[j].Group
|
||||||
|
})
|
||||||
|
return out
|
||||||
|
})
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r)
|
||||||
|
h.SeedTasks(t, r, fixtures.tasks)
|
||||||
|
h.SeedRedisSets(t, r, fixtures.allGroups)
|
||||||
|
h.SeedRedisZSets(t, r, fixtures.groups)
|
||||||
|
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
got, err := inspector.Groups(tc.qname)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Groups returned error: %v", err)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(tc.want, got, sortGroupInfosOpt); diff != "" {
|
||||||
|
t.Errorf("Groups = %v, want %v; (-want,+got)\n%s", got, tc.want, diff)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -513,7 +513,6 @@ func TestGroupStats(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetTaskInfo(t *testing.T) {
|
func TestGetTaskInfo(t *testing.T) {
|
||||||
|
Loading…
Reference in New Issue
Block a user