diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index dec8ec6..f73476b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -25,8 +25,7 @@ func (r *RDB) AllQueues() ([]string, error) { // Stats represents a state of queues at a certain time. type Stats struct { // Name of the queue (e.g. "default", "critical"). - // Note: It doesn't include the prefix "asynq:queues:". - Name string + Queue string // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool @@ -47,9 +46,15 @@ type Stats struct { // DailyStats holds aggregate data for a given day. type DailyStats struct { + // Name of the queue (e.g. "default", "critical"). + Queue string + // Total number of tasks processed during the given day. + // The number includes both succeeded and failed tasks. Processed int - Failed int - Time time.Time + // Total number of tasks failed during the given day. + Failed int + // Date this stats was taken. + Time time.Time } var ErrQueueNotFound = errors.New("rdb: queue does not exist") @@ -120,7 +125,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { return nil, err } stats := &Stats{ - Name: qname, + Queue: qname, Timestamp: now, } for i := 0; i < len(data); i += 2 { @@ -157,14 +162,14 @@ local res = {} for _, key in ipairs(KEYS) do local n = redis.call("GET", key) if not n then - n = 0 + n = 0 end table.insert(res, tonumber(n)) end return res`) -// HistoricalStats returns a list of stats from the last n days. -func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { +// HistoricalStats returns a list of stats from the last n days for the given queue. +func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) { if n < 1 { return []*DailyStats{}, nil } @@ -175,10 +180,10 @@ func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { for i := 0; i < n; i++ { ts := now.Add(-time.Duration(i) * day) days = append(days, ts) - keys = append(keys, base.ProcessedKey(ts)) - keys = append(keys, base.FailureKey(ts)) + keys = append(keys, base.ProcessedKey(qname, ts)) + keys = append(keys, base.FailureKey(qname, ts)) } - res, err := historicalStatsCmd.Run(r.client, keys, len(keys)).Result() + res, err := historicalStatsCmd.Run(r.client, keys).Result() if err != nil { return nil, err } @@ -189,6 +194,7 @@ func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { var stats []*DailyStats for i := 0; i < len(data); i += 2 { stats = append(stats, &DailyStats{ + Queue: qname, Processed: data[i], Failed: data[i+1], Time: days[i/2], diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 2be16c9..02f2a0d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -108,7 +108,7 @@ func TestCurrentStats(t *testing.T) { paused: []string{}, qname: "default", want: &Stats{ - Name: "default", + Queue: "default", Paused: false, Enqueued: 1, InProgress: 1, @@ -162,7 +162,7 @@ func TestCurrentStats(t *testing.T) { paused: []string{"critical", "low"}, qname: "critical", want: &Stats{ - Name: "critical", + Queue: "critical", Paused: true, Enqueued: 1, InProgress: 0, @@ -225,11 +225,12 @@ func TestHistoricalStats(t *testing.T) { now := time.Now().UTC() tests := []struct { - n int // number of days + qname // queue of interest + n int // number of days }{ - {90}, - {7}, - {0}, + {"default", 90}, + {"custom", 7}, + {"default", 0}, } for _, tc := range tests { @@ -238,31 +239,32 @@ func TestHistoricalStats(t *testing.T) { // populate last n days data for i := 0; i < tc.n; i++ { ts := now.Add(-time.Duration(i) * 24 * time.Hour) - processedKey := base.ProcessedKey(ts) - failedKey := base.FailureKey(ts) + processedKey := base.ProcessedKey(tc.qname, ts) + failedKey := base.FailureKey(tc.qname, ts) r.client.Set(processedKey, (i+1)*1000, 0) r.client.Set(failedKey, (i+1)*10, 0) } - got, err := r.HistoricalStats(tc.n) + got, err := r.HistoricalStats(tc.qname, tc.n) if err != nil { - t.Errorf("RDB.HistoricalStats(%v) returned error: %v", tc.n, err) + t.Errorf("RDB.HistoricalStats(%q, %d) returned error: %v", tc.qname, tc.n, err) continue } if len(got) != tc.n { - t.Errorf("RDB.HistorycalStats(%v) returned %d daily stats, want %d", tc.n, len(got), tc.n) + t.Errorf("RDB.HistorycalStats(%q, %d) returned %d daily stats, want %d", tc.qname, tc.n, len(got), tc.n) continue } for i := 0; i < tc.n; i++ { want := &DailyStats{ + Queue: tc.qname, Processed: (i + 1) * 1000, Failed: (i + 1) * 10, Time: now.Add(-time.Duration(i) * 24 * time.Hour), } if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { - t.Errorf("RDB.HistoricalStats %d days ago data; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) + t.Errorf("RDB.HistoricalStats for the last %d days; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) } } }