2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Show number of groups and aggregating task count in QueueInfo

This commit is contained in:
Ken Hibino 2022-03-15 06:52:56 -07:00
parent 74d2eea4e0
commit efe3c74037
4 changed files with 171 additions and 83 deletions

View File

@ -56,9 +56,12 @@ type QueueInfo struct {
Latency time.Duration
// Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
// The value is the sum of Pending, Active, Scheduled, Retry, Aggregating and Archived.
Size int
// Groups is the total number of groups in the queue.
Groups int
// Number of pending tasks.
Pending int
// Number of active tasks.
@ -71,6 +74,8 @@ type QueueInfo struct {
Archived int
// Number of stored completed tasks.
Completed int
// Number of aggregating tasks.
Aggregating int
// Total number of tasks being processed within the given date (counter resets daily).
// The number includes both succeeded and failed tasks.
@ -105,12 +110,14 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency,
Size: stats.Size,
Groups: stats.Groups,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Completed: stats.Completed,
Aggregating: stats.Aggregating,
Processed: stats.Processed,
Failed: stats.Failed,
ProcessedTotal: stats.ProcessedTotal,

View File

@ -34,6 +34,10 @@ type Stats struct {
Paused bool
// Size is the total number of tasks in the queue.
Size int
// Groups is the total number of groups in the queue.
Groups int
// Number of tasks in each state.
Pending int
Active int
@ -41,6 +45,7 @@ type Stats struct {
Retry int
Archived int
Completed int
Aggregating int
// Number of tasks processed within the current date.
// The number includes both succeeded and failed tasks.
@ -83,8 +88,10 @@ type DailyStats struct {
// KEYS[9] -> asynq:<qname>:processed
// KEYS[10] -> asynq:<qname>:failed
// KEYS[11] -> asynq:<qname>:paused
//
// KEYS[12] -> asynq:<qname>:groups
// --------
// ARGV[1] -> task key prefix
// ARGV[2] -> group key prefix
var currentStatsCmd = redis.NewScript(`
local res = {}
local pendingTaskCount = redis.call("LLEN", KEYS[1])
@ -118,6 +125,15 @@ if pendingTaskCount > 0 then
else
table.insert(res, 0)
end
local group_names = redis.call("SMEMBERS", KEYS[12])
table.insert(res, "group_size")
table.insert(res, table.getn(group_names))
local aggregating_count = 0
for _, gname in ipairs(group_names) do
aggregating_count = aggregating_count + redis.call("ZCARD", ARGV[2] .. gname)
end
table.insert(res, "aggregating_count")
table.insert(res, aggregating_count)
return res`)
// CurrentStats returns a current state of the queues.
@ -131,7 +147,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
now := r.clock.Now()
res, err := currentStatsCmd.Run(context.Background(), r.client, []string{
keys := []string{
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
@ -143,7 +159,13 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.ProcessedTotalKey(qname),
base.FailedTotalKey(qname),
base.PausedKey(qname),
}, base.TaskKeyPrefix(qname)).Result()
base.AllGroups(qname),
}
argv := []interface{}{
base.TaskKeyPrefix(qname),
base.GroupKeyPrefix(qname),
}
res, err := currentStatsCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
@ -198,6 +220,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
} else {
stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val)))
}
case "group_size":
stats.Groups = val
case "aggregating_count":
stats.Aggregating = val
}
}
stats.Size = size

View File

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/go-redis/redis/v8"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
@ -54,22 +55,28 @@ func TestAllQueues(t *testing.T) {
func TestCurrentStats(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/path/to/img"}))
m4 := h.NewTaskMessage("sync", nil)
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
m1 := h.NewTaskMessageBuilder().SetType("send_email").Build()
m2 := h.NewTaskMessageBuilder().SetType("reindex").Build()
m3 := h.NewTaskMessageBuilder().SetType("gen_thumbnail").Build()
m4 := h.NewTaskMessageBuilder().SetType("sync").Build()
m5 := h.NewTaskMessageBuilder().SetType("important_notification").SetQueue("critical").Build()
m6 := h.NewTaskMessageBuilder().SetType("minor_notification").SetQueue("low").Build()
m7 := h.NewTaskMessageBuilder().SetType("send_sms").Build()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct {
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
tasks []*taskData
allQueues []string
allGroups map[string][]string
pending map[string][]string
active map[string][]string
scheduled map[string][]*redis.Z
retry map[string][]*redis.Z
archived map[string][]*redis.Z
completed map[string][]*redis.Z
groups map[string][]*redis.Z
processed map[string]int
failed map[string]int
processedTotal map[string]int
@ -80,38 +87,56 @@ func TestCurrentStats(t *testing.T) {
want *Stats
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m5},
"low": {m6},
tasks: []*taskData{
{msg: m1, state: base.TaskStatePending},
{msg: m2, state: base.TaskStateActive},
{msg: m3, state: base.TaskStateScheduled},
{msg: m4, state: base.TaskStateScheduled},
{msg: m5, state: base.TaskStatePending},
{msg: m6, state: base.TaskStatePending},
{msg: m7, state: base.TaskStateAggregating},
},
active: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
allQueues: []string{"default", "critical", "low"},
allGroups: map[string][]string{
base.AllGroups("default"): {"sms:user1"},
},
scheduled: map[string][]base.Z{
"default": {
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()},
pending: map[string][]string{
base.PendingKey("default"): {m1.ID},
base.PendingKey("critical"): {m5.ID},
base.PendingKey("low"): {m6.ID},
},
"critical": {},
"low": {},
active: map[string][]string{
base.ActiveKey("default"): {m2.ID},
base.ActiveKey("critical"): {},
base.ActiveKey("low"): {},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
scheduled: map[string][]*redis.Z{
base.ScheduledKey("default"): {
{Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())},
{Member: m4.ID, Score: float64(now.Unix())},
},
archived: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
base.ScheduledKey("critical"): {},
base.ScheduledKey("low"): {},
},
retry: map[string][]*redis.Z{
base.RetryKey("default"): {},
base.RetryKey("critical"): {},
base.RetryKey("low"): {},
},
archived: map[string][]*redis.Z{
base.ArchivedKey("default"): {},
base.ArchivedKey("critical"): {},
base.ArchivedKey("low"): {},
},
completed: map[string][]*redis.Z{
base.CompletedKey("default"): {},
base.CompletedKey("critical"): {},
base.CompletedKey("low"): {},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "sms:user1"): {
{Member: m7.ID, Score: float64(now.Add(-3 * time.Second).Unix())},
},
completed: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
@ -144,12 +169,14 @@ func TestCurrentStats(t *testing.T) {
Queue: "default",
Paused: false,
Size: 4,
Groups: 1,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Archived: 0,
Completed: 0,
Aggregating: 1,
Processed: 120,
Failed: 2,
ProcessedTotal: 11111,
@ -159,38 +186,46 @@ func TestCurrentStats(t *testing.T) {
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {},
"low": {m6},
tasks: []*taskData{
{msg: m1, state: base.TaskStatePending},
{msg: m2, state: base.TaskStateActive},
{msg: m3, state: base.TaskStateScheduled},
{msg: m4, state: base.TaskStateScheduled},
{msg: m6, state: base.TaskStatePending},
},
active: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
allQueues: []string{"default", "critical", "low"},
pending: map[string][]string{
base.PendingKey("default"): {m1.ID},
base.PendingKey("critical"): {},
base.PendingKey("low"): {m6.ID},
},
scheduled: map[string][]base.Z{
"default": {
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()},
active: map[string][]string{
base.ActiveKey("default"): {m2.ID},
base.ActiveKey("critical"): {},
base.ActiveKey("low"): {},
},
"critical": {},
"low": {},
scheduled: map[string][]*redis.Z{
base.ScheduledKey("default"): {
{Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())},
{Member: m4.ID, Score: float64(now.Unix())},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
base.ScheduledKey("critical"): {},
base.ScheduledKey("low"): {},
},
archived: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
retry: map[string][]*redis.Z{
base.RetryKey("default"): {},
base.RetryKey("critical"): {},
base.RetryKey("low"): {},
},
completed: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
archived: map[string][]*redis.Z{
base.ArchivedKey("default"): {},
base.ArchivedKey("critical"): {},
base.ArchivedKey("low"): {},
},
completed: map[string][]*redis.Z{
base.CompletedKey("default"): {},
base.CompletedKey("critical"): {},
base.CompletedKey("low"): {},
},
processed: map[string]int{
"default": 120,
@ -223,12 +258,14 @@ func TestCurrentStats(t *testing.T) {
Queue: "critical",
Paused: true,
Size: 0,
Groups: 0,
Pending: 0,
Active: 0,
Scheduled: 0,
Retry: 0,
Archived: 0,
Completed: 0,
Aggregating: 0,
Processed: 100,
Failed: 0,
ProcessedTotal: 22222,
@ -246,12 +283,16 @@ func TestCurrentStats(t *testing.T) {
t.Fatal(err)
}
}
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
SeedSet(t, r.client, base.AllQueues, tc.allQueues)
SeedSets(t, r.client, tc.allGroups)
SeedTasks(t, r.client, tc.tasks)
SeedLists(t, r.client, tc.pending)
SeedLists(t, r.client, tc.active)
SeedZSets(t, r.client, tc.scheduled)
SeedZSets(t, r.client, tc.retry)
SeedZSets(t, r.client, tc.archived)
SeedZSets(t, r.client, tc.completed)
SeedZSets(t, r.client, tc.groups)
ctx := context.Background()
for qname, n := range tc.processed {
r.client.Set(ctx, base.ProcessedKey(qname, now), n, 0)

View File

@ -3776,11 +3776,25 @@ func SeedZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis
func SeedSets(tb testing.TB, r redis.UniversalClient, sets map[string][]string) {
for key, set := range sets {
for _, mem := range set {
SeedSet(tb, r, key, set)
}
}
func SeedSet(tb testing.TB, r redis.UniversalClient, key string, members []string) {
for _, mem := range members {
if err := r.SAdd(context.Background(), key, mem).Err(); err != nil {
tb.Fatalf("Failed to seed set (key=%q): %v", key, err)
}
}
}
func SeedLists(tb testing.TB, r redis.UniversalClient, lists map[string][]string) {
for key, vals := range lists {
for _, v := range vals {
if err := r.LPush(context.Background(), key, v).Err(); err != nil {
tb.Fatalf("Failed to seed list (key=%q): %v", key, err)
}
}
}
}