diff --git a/inspector.go b/inspector.go index d636b34..8a364a2 100644 --- a/inspector.go +++ b/inspector.go @@ -56,9 +56,12 @@ type QueueInfo struct { Latency time.Duration // Size is the total number of tasks in the queue. - // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. + // The value is the sum of Pending, Active, Scheduled, Retry, Aggregating and Archived. Size int + // Groups is the total number of groups in the queue. + Groups int + // Number of pending tasks. Pending int // Number of active tasks. @@ -71,6 +74,8 @@ type QueueInfo struct { Archived int // Number of stored completed tasks. Completed int + // Number of aggregating tasks. + Aggregating int // Total number of tasks being processed within the given date (counter resets daily). // The number includes both succeeded and failed tasks. @@ -105,12 +110,14 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) { MemoryUsage: stats.MemoryUsage, Latency: stats.Latency, Size: stats.Size, + Groups: stats.Groups, Pending: stats.Pending, Active: stats.Active, Scheduled: stats.Scheduled, Retry: stats.Retry, Archived: stats.Archived, Completed: stats.Completed, + Aggregating: stats.Aggregating, Processed: stats.Processed, Failed: stats.Failed, ProcessedTotal: stats.ProcessedTotal, diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 2143421..5fad096 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -34,13 +34,18 @@ type Stats struct { Paused bool // Size is the total number of tasks in the queue. Size int + + // Groups is the total number of groups in the queue. + Groups int + // Number of tasks in each state. - Pending int - Active int - Scheduled int - Retry int - Archived int - Completed int + Pending int + Active int + Scheduled int + Retry int + Archived int + Completed int + Aggregating int // Number of tasks processed within the current date. // The number includes both succeeded and failed tasks. @@ -83,8 +88,10 @@ type DailyStats struct { // KEYS[9] -> asynq::processed // KEYS[10] -> asynq::failed // KEYS[11] -> asynq::paused -// +// KEYS[12] -> asynq::groups +// -------- // ARGV[1] -> task key prefix +// ARGV[2] -> group key prefix var currentStatsCmd = redis.NewScript(` local res = {} local pendingTaskCount = redis.call("LLEN", KEYS[1]) @@ -118,6 +125,15 @@ if pendingTaskCount > 0 then else table.insert(res, 0) end +local group_names = redis.call("SMEMBERS", KEYS[12]) +table.insert(res, "group_size") +table.insert(res, table.getn(group_names)) +local aggregating_count = 0 +for _, gname in ipairs(group_names) do + aggregating_count = aggregating_count + redis.call("ZCARD", ARGV[2] .. gname) +end +table.insert(res, "aggregating_count") +table.insert(res, aggregating_count) return res`) // CurrentStats returns a current state of the queues. @@ -131,7 +147,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } now := r.clock.Now() - res, err := currentStatsCmd.Run(context.Background(), r.client, []string{ + keys := []string{ base.PendingKey(qname), base.ActiveKey(qname), base.ScheduledKey(qname), @@ -143,7 +159,13 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { base.ProcessedTotalKey(qname), base.FailedTotalKey(qname), base.PausedKey(qname), - }, base.TaskKeyPrefix(qname)).Result() + base.AllGroups(qname), + } + argv := []interface{}{ + base.TaskKeyPrefix(qname), + base.GroupKeyPrefix(qname), + } + res, err := currentStatsCmd.Run(context.Background(), r.client, keys, argv...).Result() if err != nil { return nil, errors.E(op, errors.Unknown, err) } @@ -198,6 +220,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { } else { stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val))) } + case "group_size": + stats.Groups = val + case "aggregating_count": + stats.Aggregating = val } } stats.Size = size diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0a99f37..864a6a4 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v8" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" @@ -54,22 +55,28 @@ func TestAllQueues(t *testing.T) { func TestCurrentStats(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/path/to/img"})) - m4 := h.NewTaskMessage("sync", nil) - m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") - m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") + + m1 := h.NewTaskMessageBuilder().SetType("send_email").Build() + m2 := h.NewTaskMessageBuilder().SetType("reindex").Build() + m3 := h.NewTaskMessageBuilder().SetType("gen_thumbnail").Build() + m4 := h.NewTaskMessageBuilder().SetType("sync").Build() + m5 := h.NewTaskMessageBuilder().SetType("important_notification").SetQueue("critical").Build() + m6 := h.NewTaskMessageBuilder().SetType("minor_notification").SetQueue("low").Build() + m7 := h.NewTaskMessageBuilder().SetType("send_sms").Build() now := time.Now() r.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { - pending map[string][]*base.TaskMessage - active map[string][]*base.TaskMessage - scheduled map[string][]base.Z - retry map[string][]base.Z - archived map[string][]base.Z - completed map[string][]base.Z + tasks []*taskData + allQueues []string + allGroups map[string][]string + pending map[string][]string + active map[string][]string + scheduled map[string][]*redis.Z + retry map[string][]*redis.Z + archived map[string][]*redis.Z + completed map[string][]*redis.Z + groups map[string][]*redis.Z processed map[string]int failed map[string]int processedTotal map[string]int @@ -80,38 +87,56 @@ func TestCurrentStats(t *testing.T) { want *Stats }{ { - pending: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m5}, - "low": {m6}, + tasks: []*taskData{ + {msg: m1, state: base.TaskStatePending}, + {msg: m2, state: base.TaskStateActive}, + {msg: m3, state: base.TaskStateScheduled}, + {msg: m4, state: base.TaskStateScheduled}, + {msg: m5, state: base.TaskStatePending}, + {msg: m6, state: base.TaskStatePending}, + {msg: m7, state: base.TaskStateAggregating}, }, - active: map[string][]*base.TaskMessage{ - "default": {m2}, - "critical": {}, - "low": {}, + allQueues: []string{"default", "critical", "low"}, + allGroups: map[string][]string{ + base.AllGroups("default"): {"sms:user1"}, }, - scheduled: map[string][]base.Z{ - "default": { - {Message: m3, Score: now.Add(time.Hour).Unix()}, - {Message: m4, Score: now.Unix()}, + pending: map[string][]string{ + base.PendingKey("default"): {m1.ID}, + base.PendingKey("critical"): {m5.ID}, + base.PendingKey("low"): {m6.ID}, + }, + active: map[string][]string{ + base.ActiveKey("default"): {m2.ID}, + base.ActiveKey("critical"): {}, + base.ActiveKey("low"): {}, + }, + scheduled: map[string][]*redis.Z{ + base.ScheduledKey("default"): { + {Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())}, + {Member: m4.ID, Score: float64(now.Unix())}, }, - "critical": {}, - "low": {}, + base.ScheduledKey("critical"): {}, + base.ScheduledKey("low"): {}, }, - retry: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + retry: map[string][]*redis.Z{ + base.RetryKey("default"): {}, + base.RetryKey("critical"): {}, + base.RetryKey("low"): {}, }, - archived: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + archived: map[string][]*redis.Z{ + base.ArchivedKey("default"): {}, + base.ArchivedKey("critical"): {}, + base.ArchivedKey("low"): {}, }, - completed: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + completed: map[string][]*redis.Z{ + base.CompletedKey("default"): {}, + base.CompletedKey("critical"): {}, + base.CompletedKey("low"): {}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "sms:user1"): { + {Member: m7.ID, Score: float64(now.Add(-3 * time.Second).Unix())}, + }, }, processed: map[string]int{ "default": 120, @@ -144,12 +169,14 @@ func TestCurrentStats(t *testing.T) { Queue: "default", Paused: false, Size: 4, + Groups: 1, Pending: 1, Active: 1, Scheduled: 2, Retry: 0, Archived: 0, Completed: 0, + Aggregating: 1, Processed: 120, Failed: 2, ProcessedTotal: 11111, @@ -159,38 +186,46 @@ func TestCurrentStats(t *testing.T) { }, }, { - pending: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {}, - "low": {m6}, + tasks: []*taskData{ + {msg: m1, state: base.TaskStatePending}, + {msg: m2, state: base.TaskStateActive}, + {msg: m3, state: base.TaskStateScheduled}, + {msg: m4, state: base.TaskStateScheduled}, + {msg: m6, state: base.TaskStatePending}, }, - active: map[string][]*base.TaskMessage{ - "default": {m2}, - "critical": {}, - "low": {}, + allQueues: []string{"default", "critical", "low"}, + pending: map[string][]string{ + base.PendingKey("default"): {m1.ID}, + base.PendingKey("critical"): {}, + base.PendingKey("low"): {m6.ID}, }, - scheduled: map[string][]base.Z{ - "default": { - {Message: m3, Score: now.Add(time.Hour).Unix()}, - {Message: m4, Score: now.Unix()}, + active: map[string][]string{ + base.ActiveKey("default"): {m2.ID}, + base.ActiveKey("critical"): {}, + base.ActiveKey("low"): {}, + }, + scheduled: map[string][]*redis.Z{ + base.ScheduledKey("default"): { + {Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())}, + {Member: m4.ID, Score: float64(now.Unix())}, }, - "critical": {}, - "low": {}, + base.ScheduledKey("critical"): {}, + base.ScheduledKey("low"): {}, }, - retry: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + retry: map[string][]*redis.Z{ + base.RetryKey("default"): {}, + base.RetryKey("critical"): {}, + base.RetryKey("low"): {}, }, - archived: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + archived: map[string][]*redis.Z{ + base.ArchivedKey("default"): {}, + base.ArchivedKey("critical"): {}, + base.ArchivedKey("low"): {}, }, - completed: map[string][]base.Z{ - "default": {}, - "critical": {}, - "low": {}, + completed: map[string][]*redis.Z{ + base.CompletedKey("default"): {}, + base.CompletedKey("critical"): {}, + base.CompletedKey("low"): {}, }, processed: map[string]int{ "default": 120, @@ -223,12 +258,14 @@ func TestCurrentStats(t *testing.T) { Queue: "critical", Paused: true, Size: 0, + Groups: 0, Pending: 0, Active: 0, Scheduled: 0, Retry: 0, Archived: 0, Completed: 0, + Aggregating: 0, Processed: 100, Failed: 0, ProcessedTotal: 22222, @@ -246,12 +283,16 @@ func TestCurrentStats(t *testing.T) { t.Fatal(err) } } - h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllActiveQueues(t, r.client, tc.active) - h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllArchivedQueues(t, r.client, tc.archived) - h.SeedAllCompletedQueues(t, r.client, tc.completed) + SeedSet(t, r.client, base.AllQueues, tc.allQueues) + SeedSets(t, r.client, tc.allGroups) + SeedTasks(t, r.client, tc.tasks) + SeedLists(t, r.client, tc.pending) + SeedLists(t, r.client, tc.active) + SeedZSets(t, r.client, tc.scheduled) + SeedZSets(t, r.client, tc.retry) + SeedZSets(t, r.client, tc.archived) + SeedZSets(t, r.client, tc.completed) + SeedZSets(t, r.client, tc.groups) ctx := context.Background() for qname, n := range tc.processed { r.client.Set(ctx, base.ProcessedKey(qname, now), n, 0) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index cbddab7..6951ab6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3776,9 +3776,23 @@ func SeedZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis func SeedSets(tb testing.TB, r redis.UniversalClient, sets map[string][]string) { for key, set := range sets { - for _, mem := range set { - if err := r.SAdd(context.Background(), key, mem).Err(); err != nil { - tb.Fatalf("Failed to seed set (key=%q): %v", key, err) + SeedSet(tb, r, key, set) + } +} + +func SeedSet(tb testing.TB, r redis.UniversalClient, key string, members []string) { + for _, mem := range members { + if err := r.SAdd(context.Background(), key, mem).Err(); err != nil { + tb.Fatalf("Failed to seed set (key=%q): %v", key, err) + } + } +} + +func SeedLists(tb testing.TB, r redis.UniversalClient, lists map[string][]string) { + for key, vals := range lists { + for _, v := range vals { + if err := r.LPush(context.Background(), key, v).Err(); err != nil { + tb.Fatalf("Failed to seed list (key=%q): %v", key, err) } } }