diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 937782f..397c216 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -25,6 +25,7 @@ type Stats struct { Dead int Processed int Failed int + Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20) Timestamp time.Time } @@ -83,7 +84,7 @@ type DeadTask struct { // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { - // KEYS[1] -> asynq:queues:default + // KEYS[1] -> asynq:queues // KEYS[2] -> asynq:in_progress // KEYS[3] -> asynq:scheduled // KEYS[4] -> asynq:retry @@ -91,27 +92,40 @@ func (r *RDB) CurrentStats() (*Stats, error) { // KEYS[6] -> asynq:processed: // KEYS[7] -> asynq:failure: script := redis.NewScript(` - local qlen = redis.call("LLEN", KEYS[1]) - local plen = redis.call("LLEN", KEYS[2]) - local slen = redis.call("ZCARD", KEYS[3]) - local rlen = redis.call("ZCARD", KEYS[4]) - local dlen = redis.call("ZCARD", KEYS[5]) + local res = {} + local queues = redis.call("SMEMBERS", KEYS[1]) + for _, qkey in ipairs(queues) do + table.insert(res, qkey) + table.insert(res, redis.call("LLEN", qkey)) + end + table.insert(res, KEYS[2]) + table.insert(res, redis.call("LLEN", KEYS[2])) + table.insert(res, KEYS[3]) + table.insert(res, redis.call("ZCARD", KEYS[3])) + table.insert(res, KEYS[4]) + table.insert(res, redis.call("ZCARD", KEYS[4])) + table.insert(res, KEYS[5]) + table.insert(res, redis.call("ZCARD", KEYS[5])) local pcount = 0 local p = redis.call("GET", KEYS[6]) if p then pcount = tonumber(p) end + table.insert(res, "processed") + table.insert(res, pcount) local fcount = 0 local f = redis.call("GET", KEYS[7]) if f then fcount = tonumber(f) end - return {qlen, plen, slen, rlen, dlen, pcount, fcount} + table.insert(res, "failed") + table.insert(res, fcount) + return res `) now := time.Now() res, err := script.Run(r.client, []string{ - base.DefaultQueue, + base.AllQueues, base.InProgressQueue, base.ScheduledQueue, base.RetryQueue, @@ -122,20 +136,37 @@ func (r *RDB) CurrentStats() (*Stats, error) { if err != nil { return nil, err } - nums, err := cast.ToIntSliceE(res) + data, err := cast.ToSliceE(res) if err != nil { return nil, err } - return &Stats{ - Enqueued: nums[0], - InProgress: nums[1], - Scheduled: nums[2], - Retry: nums[3], - Dead: nums[4], - Processed: nums[5], - Failed: nums[6], - Timestamp: now, - }, nil + stats := &Stats{ + Queues: make(map[string]int), + Timestamp: now, + } + for i := 0; i < len(data); i += 2 { + key := cast.ToString(data[i]) + val := cast.ToInt(data[i+1]) + + switch { + case strings.HasPrefix(key, base.QueuePrefix): + stats.Enqueued += val + stats.Queues[strings.TrimPrefix(key, base.QueuePrefix)] = val + case key == base.InProgressQueue: + stats.InProgress = val + case key == base.ScheduledQueue: + stats.Scheduled = val + case key == base.RetryQueue: + stats.Retry = val + case key == base.DeadQueue: + stats.Dead = val + case key == "processed": + stats.Processed = val + case key == "failed": + stats.Failed = val + } + } + return stats, nil } // HistoricalStats returns a list of stats from the last n days. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 146d461..13a415c 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -22,20 +22,29 @@ func TestCurrentStats(t *testing.T) { m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m4 := h.NewTaskMessage("sync", nil) + m5 := h.NewTaskMessage("important_notification", nil) + m5.Queue = "critical" + m6 := h.NewTaskMessage("minor_notification", nil) + m6.Queue = "low" now := time.Now() tests := []struct { - enqueued []*base.TaskMessage + enqueued map[string][]*base.TaskMessage inProgress []*base.TaskMessage scheduled []h.ZSetEntry retry []h.ZSetEntry dead []h.ZSetEntry processed int failed int + allQueues []interface{} want *Stats }{ { - enqueued: []*base.TaskMessage{m1}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1}, + "critical": {m5}, + "low": {m6}, + }, inProgress: []*base.TaskMessage{m2}, scheduled: []h.ZSetEntry{ {Msg: m3, Score: now.Add(time.Hour).Unix()}, @@ -44,8 +53,9 @@ func TestCurrentStats(t *testing.T) { dead: []h.ZSetEntry{}, processed: 120, failed: 2, + allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, want: &Stats{ - Enqueued: 1, + Enqueued: 3, InProgress: 1, Scheduled: 2, Retry: 0, @@ -53,10 +63,13 @@ func TestCurrentStats(t *testing.T) { Processed: 120, Failed: 2, Timestamp: now, + Queues: map[string]int{base.DefaultQueueName: 1, "critical": 1, "low": 1}, }, }, { - enqueued: []*base.TaskMessage{}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {}, + }, inProgress: []*base.TaskMessage{}, scheduled: []h.ZSetEntry{ {Msg: m3, Score: now.Unix()}, @@ -67,6 +80,7 @@ func TestCurrentStats(t *testing.T) { {Msg: m2, Score: now.Add(-time.Hour).Unix()}}, processed: 90, failed: 10, + allQueues: []interface{}{base.DefaultQueue}, want: &Stats{ Enqueued: 0, InProgress: 0, @@ -76,13 +90,16 @@ func TestCurrentStats(t *testing.T) { Processed: 90, Failed: 10, Timestamp: now, + Queues: map[string]int{base.DefaultQueueName: 0}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedEnqueuedQueue(t, r.client, tc.enqueued) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedRetryQueue(t, r.client, tc.retry) @@ -91,6 +108,7 @@ func TestCurrentStats(t *testing.T) { failedKey := base.FailureKey(now) r.client.Set(processedKey, tc.processed, 0) r.client.Set(failedKey, tc.failed, 0) + r.client.SAdd(base.AllQueues, tc.allQueues...) got, err := r.CurrentStats() if err != nil {