2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Add AllQueues method to RDB

This commit is contained in:
Ken Hibino 2020-08-11 05:35:06 -07:00
parent b5caefd663
commit d25090c669
2 changed files with 47 additions and 18 deletions

View File

@ -17,33 +17,34 @@ import (
"github.com/spf13/cast" "github.com/spf13/cast"
) )
// AllQueues returns a list of all queue names.
func (r *RDB) AllQueues() ([]string, error) {
return r.client.SMembers(base.AllQueues).Result()
}
// Stats represents a state of queues at a certain time. // Stats represents a state of queues at a certain time.
type Stats struct { type Stats struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Number of tasks in each state.
Enqueued int Enqueued int
InProgress int InProgress int
Scheduled int Scheduled int
Retry int Retry int
Dead int Dead int
// Total number of tasks processed during the current date.
// The number includes both succeeded and failed tasks.
Processed int Processed int
// Total number of tasks failed during the current date.
Failed int Failed int
Queues []*Queue // Time this stats was taken.
Timestamp time.Time Timestamp time.Time
} }
// Queue represents a task queue.
type Queue struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
}
// DailyStats holds aggregate data for a given day. // DailyStats holds aggregate data for a given day.
type DailyStats struct { type DailyStats struct {
Processed int Processed int

View File

@ -16,6 +16,34 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
func TestAllQueues(t *testing.T) {
r := setup(t)
tests := []struct {
queues []string
}{
{queues: []string{"default"}},
{queues: []string{"custom1", "custom2"}},
{queues: []string{"default", "custom1", "custom2"}},
{queues: []string{}},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
if err := r.client.SAdd(base.AllQueues, tc.queues...).Err(); err != nil {
t.Fatal("could not initialize all queue set")
}
got, err := r.AllQueues()
if err != nil {
t.Errorf("AllQueues() returned an error: %v", err)
continue
}
if diff := cmp.Diff(tc.queues, got, h.SortStringSliceOpt); diff != nil {
t.Errorf("AllQueues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
}
}
}
func TestCurrentStats(t *testing.T) { func TestCurrentStats(t *testing.T) {
r := setup(t) r := setup(t)
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})