2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 06:42:16 +08:00

Update stats command to show queue paused status

This commit is contained in:
Ken Hibino 2020-06-05 06:42:27 -07:00
parent d6a5c84dc6
commit 4e800a7f68
3 changed files with 97 additions and 18 deletions

View File

@ -7,6 +7,7 @@ package rdb
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
@ -25,10 +26,17 @@ type Stats struct {
Dead int
Processed int
Failed int
Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20)
Queues []*Queue
Timestamp time.Time
}
// Queue represents a task queue.
type Queue struct {
Name string
Paused bool
Size int // number of tasks in the queue
}
// DailyStats holds aggregate data for a given day.
type DailyStats struct {
Processed int
@ -143,8 +151,12 @@ func (r *RDB) CurrentStats() (*Stats, error) {
if err != nil {
return nil, err
}
paused, err := r.client.SMembersMap(base.PausedQueues).Result()
if err != nil {
return nil, err
}
stats := &Stats{
Queues: make(map[string]int),
Queues: make([]*Queue, 0),
Timestamp: now,
}
for i := 0; i < len(data); i += 2 {
@ -154,7 +166,14 @@ func (r *RDB) CurrentStats() (*Stats, error) {
switch {
case strings.HasPrefix(key, base.QueuePrefix):
stats.Enqueued += val
stats.Queues[strings.TrimPrefix(key, base.QueuePrefix)] = val
q := Queue{
Name: strings.TrimPrefix(key, base.QueuePrefix),
Size: val,
}
if _, exist := paused[key]; exist {
q.Paused = true
}
stats.Queues = append(stats.Queues, &q)
case key == base.InProgressQueue:
stats.InProgress = val
case key == base.ScheduledQueue:
@ -169,6 +188,9 @@ func (r *RDB) CurrentStats() (*Stats, error) {
stats.Failed = val
}
}
sort.Slice(stats.Queues, func(i, j int) bool {
return stats.Queues[i].Name < stats.Queues[j].Name
})
return stats, nil
}

View File

@ -38,6 +38,7 @@ func TestCurrentStats(t *testing.T) {
processed int
failed int
allQueues []interface{}
paused []string
want *Stats
}{
{
@ -55,6 +56,7 @@ func TestCurrentStats(t *testing.T) {
processed: 120,
failed: 2,
allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")},
paused: []string{},
want: &Stats{
Enqueued: 3,
InProgress: 1,
@ -64,7 +66,12 @@ func TestCurrentStats(t *testing.T) {
Processed: 120,
Failed: 2,
Timestamp: now,
Queues: map[string]int{base.DefaultQueueName: 1, "critical": 1, "low": 1},
// Queues should be sorted by name.
Queues: []*Queue{
{Name: "critical", Paused: false, Size: 1},
{Name: "default", Paused: false, Size: 1},
{Name: "low", Paused: false, Size: 1},
},
},
},
{
@ -82,6 +89,7 @@ func TestCurrentStats(t *testing.T) {
processed: 90,
failed: 10,
allQueues: []interface{}{base.DefaultQueue},
paused: []string{},
want: &Stats{
Enqueued: 0,
InProgress: 0,
@ -91,13 +99,56 @@ func TestCurrentStats(t *testing.T) {
Processed: 90,
Failed: 10,
Timestamp: now,
Queues: map[string]int{base.DefaultQueueName: 0},
Queues: []*Queue{
{
Name: base.DefaultQueueName,
Paused: false,
Size: 0,
},
},
},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1},
"critical": {m5},
"low": {m6},
},
inProgress: []*base.TaskMessage{m2},
scheduled: []h.ZSetEntry{
{Msg: m3, Score: float64(now.Add(time.Hour).Unix())},
{Msg: m4, Score: float64(now.Unix())}},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
processed: 120,
failed: 2,
allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")},
paused: []string{"critical", "low"},
want: &Stats{
Enqueued: 3,
InProgress: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Timestamp: now,
Queues: []*Queue{
{Name: "critical", Paused: true, Size: 1},
{Name: "default", Paused: false, Size: 1},
{Name: "low", Paused: true, Size: 1},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
for _, qname := range tc.paused {
if err := r.Pause(qname); err != nil {
t.Fatal(err)
}
}
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
@ -136,7 +187,7 @@ func TestCurrentStatsWithoutData(t *testing.T) {
Processed: 0,
Failed: 0,
Timestamp: time.Now(),
Queues: map[string]int{},
Queues: make([]*Queue, 0),
}
got, err := r.CurrentStats()

View File

@ -7,7 +7,6 @@ package cmd
import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"text/tabwriter"
@ -96,24 +95,31 @@ func printStates(s *rdb.Stats) {
tw.Flush()
}
func printQueues(queues map[string]int) {
var qnames, seps, counts []string
for q := range queues {
qnames = append(qnames, strings.Title(q))
func printQueues(queues []*rdb.Queue) {
var headers, seps, counts []string
for _, q := range queues {
title := queueTitle(q)
headers = append(headers, title)
seps = append(seps, strings.Repeat("-", len(title)))
counts = append(counts, strconv.Itoa(q.Size))
}
sort.Strings(qnames) // sort for stable order
for _, q := range qnames {
seps = append(seps, strings.Repeat("-", len(q)))
counts = append(counts, strconv.Itoa(queues[strings.ToLower(q)]))
}
format := strings.Repeat("%v\t", len(qnames)) + "\n"
format := strings.Repeat("%v\t", len(headers)) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, toInterfaceSlice(qnames)...)
fmt.Fprintf(tw, format, toInterfaceSlice(headers)...)
fmt.Fprintf(tw, format, toInterfaceSlice(seps)...)
fmt.Fprintf(tw, format, toInterfaceSlice(counts)...)
tw.Flush()
}
func queueTitle(q *rdb.Queue) string {
var b strings.Builder
b.WriteString(strings.Title(q.Name))
if q.Paused {
b.WriteString(" (Paused)")
}
return b.String()
}
func printStats(s *rdb.Stats) {
format := strings.Repeat("%v\t", 3) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)