diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index bebc94a..693c4f3 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -6,8 +6,8 @@ package rdb import ( "encoding/json" + "errors" "fmt" - "sort" "strings" "time" @@ -52,20 +52,20 @@ type DailyStats struct { Time time.Time } -// KEYS[1] -> asynq:queues -// KEYS[2] -> asynq:in_progress -// KEYS[3] -> asynq:scheduled -// KEYS[4] -> asynq:retry -// KEYS[5] -> asynq:dead -// KEYS[6] -> asynq:processed: -// KEYS[7] -> asynq:failure: +var ErrQueueNotFound = errors.New("rdb: queue does not exist") + +// KEYS[1] -> asynq: +// KEYS[2] -> asynq::in_progress +// KEYS[3] -> asynq::scheduled +// KEYS[4] -> asynq::retry +// KEYS[5] -> asynq::dead +// KEYS[6] -> asynq::processed: +// KEYS[7] -> asynq::failed: +// KEYS[8] -> asynq::paused var currentStatsCmd = redis.NewScript(` local res = {} -local queues = redis.call("SMEMBERS", KEYS[1]) -for _, qkey in ipairs(queues) do - table.insert(res, qkey) - table.insert(res, redis.call("LLEN", qkey)) -end +table.insert(res, KEYS[1]) +table.insert(res, redis.call("LLEN", KEYS[1])) table.insert(res, KEYS[2]) table.insert(res, redis.call("LLEN", KEYS[2])) table.insert(res, KEYS[3]) @@ -79,28 +79,38 @@ local p = redis.call("GET", KEYS[6]) if p then pcount = tonumber(p) end -table.insert(res, "processed") +table.insert(res, KEYS[6]) table.insert(res, pcount) local fcount = 0 local f = redis.call("GET", KEYS[7]) if f then fcount = tonumber(f) end -table.insert(res, "failed") +table.insert(res, KEYS[7]) table.insert(res, fcount) +table.insert(res, KEYS[8]) +table.insert(res, redis.call("EXISTS", KEYS[8])) return res`) // CurrentStats returns a current state of the queues. -func (r *RDB) CurrentStats() (*Stats, error) { +func (r *RDB) CurrentStats(qname string) (*Stats, error) { + exists, err := r.client.SIsMember(base.AllQueues, qname).Result() + if err != nil { + return nil, err + } + if !exists { + return nil, ErrQueueNotFound + } now := time.Now() res, err := currentStatsCmd.Run(r.client, []string{ - base.AllQueues, - base.InProgressQueue, - base.ScheduledQueue, - base.RetryQueue, - base.DeadQueue, - base.ProcessedKey(now), - base.FailureKey(now), + base.QueueKey(qname), + base.InProgressKey(qname), + base.ScheduledKey(qname), + base.RetryKey(qname), + base.DeadKey(qname), + base.ProcessedKey(qname, now), + base.FailedKey(qname, now), + base.PausedKey(qname), }).Result() if err != nil { return nil, err @@ -109,46 +119,36 @@ func (r *RDB) CurrentStats() (*Stats, error) { if err != nil { return nil, err } - paused, err := r.client.SMembersMap(base.PausedQueues).Result() - if err != nil { - return nil, err - } stats := &Stats{ - Queues: make([]*Queue, 0), + Name: qname, Timestamp: now, } for i := 0; i < len(data); i += 2 { key := cast.ToString(data[i]) val := cast.ToInt(data[i+1]) - - switch { - case strings.HasPrefix(key, base.QueuePrefix): - stats.Enqueued += val - q := Queue{ - Name: strings.TrimPrefix(key, base.QueuePrefix), - Size: val, - } - if _, exist := paused[key]; exist { - q.Paused = true - } - stats.Queues = append(stats.Queues, &q) - case key == base.InProgressQueue: + switch key { + case base.QueueKey(qname): + stats.Enqueued = val + case base.InProgressKey(qname): stats.InProgress = val - case key == base.ScheduledQueue: + case base.ScheduledKey(qname): stats.Scheduled = val - case key == base.RetryQueue: + case ase.RetryKey(qname): stats.Retry = val - case key == base.DeadQueue: + case base.DeadKey(qname): stats.Dead = val - case key == "processed": + case base.ProcessedKey(qname, now): stats.Processed = val - case key == "failed": + case base.FailedKey(qname, now): stats.Failed = val + case base.PausedKey(qname): + if val == 0 { + stats.Paused = false + } else { + stats.Paused = true + } } } - sort.Slice(stats.Queues, func(i, j int) bool { - return stats.Queues[i].Name < stats.Queues[j].Name - }) return stats, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index ff496af..b954758 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -50,42 +50,67 @@ func TestCurrentStats(t *testing.T) { m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m4 := h.NewTaskMessage("sync", nil) - m5 := h.NewTaskMessage("important_notification", nil) - m5.Queue = "critical" - m6 := h.NewTaskMessage("minor_notification", nil) - m6.Queue = "low" + m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") + m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") now := time.Now() tests := []struct { enqueued map[string][]*base.TaskMessage - inProgress []*base.TaskMessage - scheduled []base.Z - retry []base.Z - dead []base.Z - processed int - failed int - allQueues []interface{} + inProgress map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + processed map[string]int + failed map[string]int paused []string + qname string want *Stats }{ { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {m1}, - "critical": {m5}, - "low": {m6}, + "default": {m1}, + "critical": {m5}, + "low": {m6}, }, - inProgress: []*base.TaskMessage{m2}, - scheduled: []base.Z{ - {Message: m3, Score: now.Add(time.Hour).Unix()}, - {Message: m4, Score: now.Unix()}}, - retry: []base.Z{}, - dead: []base.Z{}, - processed: 120, - failed: 2, - allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, - paused: []string{}, + inProgress: map[string][]*base.TaskMessage{ + "default": {m2}, + "critical": {}, + "low": {}, + }, + scheduled: map[string][]base.Z{ + "default": { + {Message: m3, Score: now.Add(time.Hour).Unix()}, + {Message: m4, Score: now.Unix()}, + }, + "critical": {}, + "low": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + processed: map[string]int{ + "default": 120, + "critical": 100, + "low": 50, + }, + failed: map[string]int{ + "default": 2, + "critical": 0, + "low": 1, + }, + paused: []string{}, + qname: "default", want: &Stats{ - Enqueued: 3, + Name: "default", + Paused: false, + Enqueued: 1, InProgress: 1, Scheduled: 2, Retry: 0, @@ -93,74 +118,60 @@ func TestCurrentStats(t *testing.T) { Processed: 120, Failed: 2, Timestamp: now, - // Queues should be sorted by name. - Queues: []*Queue{ - {Name: "critical", Paused: false, Size: 1}, - {Name: "default", Paused: false, Size: 1}, - {Name: "low", Paused: false, Size: 1}, - }, }, }, { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {}, + "default": {m1}, + "critical": {m5}, + "low": {m6}, }, - inProgress: []*base.TaskMessage{}, - scheduled: []base.Z{ - {Message: m3, Score: now.Unix()}, - {Message: m4, Score: now.Unix()}}, - retry: []base.Z{ - {Message: m1, Score: now.Add(time.Minute).Unix()}}, - dead: []base.Z{ - {Message: m2, Score: now.Add(-time.Hour).Unix()}}, - processed: 90, - failed: 10, - allQueues: []interface{}{base.DefaultQueue}, - paused: []string{}, + inProgress: map[string][]*base.TaskMessage{ + "default": {m2}, + "critical": {}, + "low": {}, + }, + scheduled: map[string][]base.Z{ + "default": { + {Message: m3, Score: now.Add(time.Hour).Unix()}, + {Message: m4, Score: now.Unix()}, + }, + "critical": {}, + "low": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + processed: map[string]int{ + "default": 120, + "critical": 100, + "low": 50, + }, + failed: map[string]int{ + "default": 2, + "critical": 0, + "low": 1, + }, + paused: []string{"critical", "low"}, + qname: "critical", want: &Stats{ - Enqueued: 0, + Name: "critical", + Paused: true, + Enqueued: 1, InProgress: 0, - Scheduled: 2, - Retry: 1, - Dead: 1, - Processed: 90, - Failed: 10, - Timestamp: now, - Queues: []*Queue{ - {Name: base.DefaultQueueName, Paused: false, Size: 0}, - }, - }, - }, - { - enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {m1}, - "critical": {m5}, - "low": {m6}, - }, - inProgress: []*base.TaskMessage{m2}, - scheduled: []base.Z{ - {Message: m3, Score: now.Add(time.Hour).Unix()}, - {Message: m4, Score: now.Unix()}}, - retry: []base.Z{}, - dead: []base.Z{}, - processed: 120, - failed: 2, - allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, - paused: []string{"critical", "low"}, - want: &Stats{ - Enqueued: 3, - InProgress: 1, - Scheduled: 2, + Scheduled: 0, Retry: 0, Dead: 0, - Processed: 120, - Failed: 2, + Processed: 100, + Failed: 0, Timestamp: now, - Queues: []*Queue{ - {Name: "critical", Paused: true, Size: 1}, - {Name: "default", Paused: false, Size: 1}, - {Name: "low", Paused: true, Size: 1}, - }, }, }, } @@ -172,54 +183,40 @@ func TestCurrentStats(t *testing.T) { t.Fatal(err) } } - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, qname) + h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllRetryQueues(t, r.client, tc.retry) + h.SeedAllDeadQueues(t, r.client, tc.dead) + for qname, n := range tc.processed { + processedKey := base.ProcessedKey(qname, now) + r.client.Set(processedKey, n, 0) + } + for qname, n := range tc.failed { + failedKey := base.FailedKey(now) + r.client.Set(failedKey, n, 0) } - h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedScheduledQueue(t, r.client, tc.scheduled) - h.SeedRetryQueue(t, r.client, tc.retry) - h.SeedDeadQueue(t, r.client, tc.dead) - processedKey := base.ProcessedKey(now) - failedKey := base.FailureKey(now) - r.client.Set(processedKey, tc.processed, 0) - r.client.Set(failedKey, tc.failed, 0) - r.client.SAdd(base.AllQueues, tc.allQueues...) - got, err := r.CurrentStats() + got, err := r.CurrentStats(tc.qname) if err != nil { - t.Errorf("r.CurrentStats() = %v, %v, want %v, nil", got, err, tc.want) + t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil", tc.qname, got, err, tc.want) continue } if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { - t.Errorf("r.CurrentStats() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff) + t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff) continue } } } -func TestCurrentStatsWithoutData(t *testing.T) { +func TestCurrentStatsWithNonExistentQueue(t *testing.T) { r := setup(t) - want := &Stats{ - Enqueued: 0, - InProgress: 0, - Scheduled: 0, - Retry: 0, - Dead: 0, - Processed: 0, - Failed: 0, - Timestamp: time.Now(), - Queues: make([]*Queue, 0), - } - - got, err := r.CurrentStats() - if err != nil { - t.Fatalf("r.CurrentStats() = %v, %v, want %+v, nil", got, err, want) - } - - if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { - t.Errorf("r.CurrentStats() = %v, %v, want %+v, nil; (-want, +got)\n%s", got, err, want, diff) + qname := "non-existent" + got, err := r.CurrentStats(qname) + if err != ErrQueueNotFound { + t.Fatalf("r.CurrentStats(%q) = %v, %v, want nil, %v", qname, got, err, ErrQueueNotFound) } }