From d6f389e63f8163db36aa8715d4a6f9db04ac1574 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 20 Aug 2020 21:17:44 -0700 Subject: [PATCH] Add Queues method to Inspector --- inspector.go | 5 +++++ inspector_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/inspector.go b/inspector.go index 9f4bc91..8630606 100644 --- a/inspector.go +++ b/inspector.go @@ -27,6 +27,11 @@ func NewInspector(r RedisConnOpt) *Inspector { } } +// Queues returns a list of all queue names. +func (i *Inspector) Queues() ([]string, error) { + return i.rdb.AllQueues() +} + // Stats represents a state of queues at a certain time. type Stats struct { // Name of the queue. diff --git a/inspector_test.go b/inspector_test.go index 22ffb9a..fce3251 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -17,6 +17,41 @@ import ( "github.com/hibiken/asynq/internal/base" ) +func TestInspectorQueues(t *testing.T) { + r := setup(t) + inspector := NewInspector(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + tests := []struct { + queues []string + }{ + {queues: []string{"default"}}, + {queues: []string{"custom1", "custom2"}}, + {queues: []string{"default", "custom1", "custom2"}}, + {queues: []string{}}, + } + + for _, tc := range tests { + h.FlushDB(t, r) + for _, qname := range tc.queues { + if err := r.SAdd(base.AllQueues, qname).Err(); err != nil { + t.Fatalf("could not initialize all queue set: %v", err) + } + } + got, err := inspector.Queues() + if err != nil { + t.Errorf("Queues() returned an error: %v", err) + continue + } + if diff := cmp.Diff(tc.queues, got, h.SortStringSliceOpt); diff != "" { + t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff) + } + } + +} + func TestInspectorCurrentStats(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil)