diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 99f30a4..af46a77 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -7,6 +7,7 @@ package rdb import ( "encoding/json" "fmt" + "sort" "strings" "time" @@ -25,10 +26,17 @@ type Stats struct { Dead int Processed int Failed int - Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20) + Queues []*Queue Timestamp time.Time } +// Queue represents a task queue. +type Queue struct { + Name string + Paused bool + Size int // number of tasks in the queue +} + // DailyStats holds aggregate data for a given day. type DailyStats struct { Processed int @@ -143,8 +151,12 @@ func (r *RDB) CurrentStats() (*Stats, error) { if err != nil { return nil, err } + paused, err := r.client.SMembersMap(base.PausedQueues).Result() + if err != nil { + return nil, err + } stats := &Stats{ - Queues: make(map[string]int), + Queues: make([]*Queue, 0), Timestamp: now, } for i := 0; i < len(data); i += 2 { @@ -154,7 +166,14 @@ func (r *RDB) CurrentStats() (*Stats, error) { switch { case strings.HasPrefix(key, base.QueuePrefix): stats.Enqueued += val - stats.Queues[strings.TrimPrefix(key, base.QueuePrefix)] = val + q := Queue{ + Name: strings.TrimPrefix(key, base.QueuePrefix), + Size: val, + } + if _, exist := paused[key]; exist { + q.Paused = true + } + stats.Queues = append(stats.Queues, &q) case key == base.InProgressQueue: stats.InProgress = val case key == base.ScheduledQueue: @@ -169,6 +188,9 @@ func (r *RDB) CurrentStats() (*Stats, error) { stats.Failed = val } } + sort.Slice(stats.Queues, func(i, j int) bool { + return stats.Queues[i].Name < stats.Queues[j].Name + }) return stats, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 986e254..c9384df 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -38,6 +38,7 @@ func TestCurrentStats(t *testing.T) { processed int failed int allQueues []interface{} + paused []string want *Stats }{ { @@ -55,6 +56,7 @@ func TestCurrentStats(t *testing.T) { processed: 120, failed: 2, allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, + paused: []string{}, want: &Stats{ Enqueued: 3, InProgress: 1, @@ -64,7 +66,12 @@ func TestCurrentStats(t *testing.T) { Processed: 120, Failed: 2, Timestamp: now, - Queues: map[string]int{base.DefaultQueueName: 1, "critical": 1, "low": 1}, + // Queues should be sorted by name. + Queues: []*Queue{ + {Name: "critical", Paused: false, Size: 1}, + {Name: "default", Paused: false, Size: 1}, + {Name: "low", Paused: false, Size: 1}, + }, }, }, { @@ -82,6 +89,7 @@ func TestCurrentStats(t *testing.T) { processed: 90, failed: 10, allQueues: []interface{}{base.DefaultQueue}, + paused: []string{}, want: &Stats{ Enqueued: 0, InProgress: 0, @@ -91,13 +99,56 @@ func TestCurrentStats(t *testing.T) { Processed: 90, Failed: 10, Timestamp: now, - Queues: map[string]int{base.DefaultQueueName: 0}, + Queues: []*Queue{ + { + Name: base.DefaultQueueName, + Paused: false, + Size: 0, + }, + }, + }, + }, + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1}, + "critical": {m5}, + "low": {m6}, + }, + inProgress: []*base.TaskMessage{m2}, + scheduled: []h.ZSetEntry{ + {Msg: m3, Score: float64(now.Add(time.Hour).Unix())}, + {Msg: m4, Score: float64(now.Unix())}}, + retry: []h.ZSetEntry{}, + dead: []h.ZSetEntry{}, + processed: 120, + failed: 2, + allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, + paused: []string{"critical", "low"}, + want: &Stats{ + Enqueued: 3, + InProgress: 1, + Scheduled: 2, + Retry: 0, + Dead: 0, + Processed: 120, + Failed: 2, + Timestamp: now, + Queues: []*Queue{ + {Name: "critical", Paused: true, Size: 1}, + {Name: "default", Paused: false, Size: 1}, + {Name: "low", Paused: true, Size: 1}, + }, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case + for _, qname := range tc.paused { + if err := r.Pause(qname); err != nil { + t.Fatal(err) + } + } for qname, msgs := range tc.enqueued { h.SeedEnqueuedQueue(t, r.client, msgs, qname) } @@ -136,7 +187,7 @@ func TestCurrentStatsWithoutData(t *testing.T) { Processed: 0, Failed: 0, Timestamp: time.Now(), - Queues: map[string]int{}, + Queues: make([]*Queue, 0), } got, err := r.CurrentStats() diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index 5ffd026..8db492e 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -7,7 +7,6 @@ package cmd import ( "fmt" "os" - "sort" "strconv" "strings" "text/tabwriter" @@ -96,24 +95,31 @@ func printStates(s *rdb.Stats) { tw.Flush() } -func printQueues(queues map[string]int) { - var qnames, seps, counts []string - for q := range queues { - qnames = append(qnames, strings.Title(q)) +func printQueues(queues []*rdb.Queue) { + var headers, seps, counts []string + for _, q := range queues { + title := queueTitle(q) + headers = append(headers, title) + seps = append(seps, strings.Repeat("-", len(title))) + counts = append(counts, strconv.Itoa(q.Size)) } - sort.Strings(qnames) // sort for stable order - for _, q := range qnames { - seps = append(seps, strings.Repeat("-", len(q))) - counts = append(counts, strconv.Itoa(queues[strings.ToLower(q)])) - } - format := strings.Repeat("%v\t", len(qnames)) + "\n" + format := strings.Repeat("%v\t", len(headers)) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - fmt.Fprintf(tw, format, toInterfaceSlice(qnames)...) + fmt.Fprintf(tw, format, toInterfaceSlice(headers)...) fmt.Fprintf(tw, format, toInterfaceSlice(seps)...) fmt.Fprintf(tw, format, toInterfaceSlice(counts)...) tw.Flush() } +func queueTitle(q *rdb.Queue) string { + var b strings.Builder + b.WriteString(strings.Title(q.Name)) + if q.Paused { + b.WriteString(" (Paused)") + } + return b.String() +} + func printStats(s *rdb.Stats) { format := strings.Repeat("%v\t", 3) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)