From 859e2656d8b10b83bbaa16751e5f959bb07e4708 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 11 Aug 2020 05:35:06 -0700 Subject: [PATCH] Add AllQueues method to RDB --- internal/rdb/inspect.go | 37 ++++++++++++++++++------------------ internal/rdb/inspect_test.go | 28 +++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c7e5862..bebc94a 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -17,31 +17,32 @@ import ( "github.com/spf13/cast" ) +// AllQueues returns a list of all queue names. +func (r *RDB) AllQueues() ([]string, error) { + return r.client.SMembers(base.AllQueues).Result() +} + // Stats represents a state of queues at a certain time. type Stats struct { + // Name of the queue (e.g. "default", "critical"). + // Note: It doesn't include the prefix "asynq:queues:". + Name string + // Paused indicates whether the queue is paused. + // If true, tasks in the queue should not be processed. + Paused bool + // Number of tasks in each state. Enqueued int InProgress int Scheduled int Retry int Dead int - Processed int - Failed int - Queues []*Queue - Timestamp time.Time -} - -// Queue represents a task queue. -type Queue struct { - // Name of the queue (e.g. "default", "critical"). - // Note: It doesn't include the prefix "asynq:queues:". - Name string - - // Paused indicates whether the queue is paused. - // If true, tasks in the queue should not be processed. - Paused bool - - // Size is the number of tasks in the queue. - Size int + // Total number of tasks processed during the current date. + // The number includes both succeeded and failed tasks. + Processed int + // Total number of tasks failed during the current date. + Failed int + // Time this stats was taken. + Timestamp time.Time } // DailyStats holds aggregate data for a given day. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index dbb50e3..ff496af 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -16,6 +16,34 @@ import ( "github.com/hibiken/asynq/internal/base" ) +func TestAllQueues(t *testing.T) { + r := setup(t) + + tests := []struct { + queues []string + }{ + {queues: []string{"default"}}, + {queues: []string{"custom1", "custom2"}}, + {queues: []string{"default", "custom1", "custom2"}}, + {queues: []string{}}, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + if err := r.client.SAdd(base.AllQueues, tc.queues...).Err(); err != nil { + t.Fatal("could not initialize all queue set") + } + got, err := r.AllQueues() + if err != nil { + t.Errorf("AllQueues() returned an error: %v", err) + continue + } + if diff := cmp.Diff(tc.queues, got, h.SortStringSliceOpt); diff != nil { + t.Errorf("AllQueues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff) + } + } +} + func TestCurrentStats(t *testing.T) { r := setup(t) m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})