2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Add AllQueues method to RDB

This commit is contained in:
Ken Hibino
2020-08-11 05:35:06 -07:00
parent a37afc6062
commit 859e2656d8
2 changed files with 47 additions and 18 deletions

View File

@@ -17,31 +17,32 @@ import (
"github.com/spf13/cast"
)
// AllQueues returns a list of all queue names.
func (r *RDB) AllQueues() ([]string, error) {
return r.client.SMembers(base.AllQueues).Result()
}
// Stats represents a state of queues at a certain time.
type Stats struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Number of tasks in each state.
Enqueued int
InProgress int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Queues []*Queue
Timestamp time.Time
}
// Queue represents a task queue.
type Queue struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
// Total number of tasks processed during the current date.
// The number includes both succeeded and failed tasks.
Processed int
// Total number of tasks failed during the current date.
Failed int
// Time this stats was taken.
Timestamp time.Time
}
// DailyStats holds aggregate data for a given day.

View File

@@ -16,6 +16,34 @@ import (
"github.com/hibiken/asynq/internal/base"
)
func TestAllQueues(t *testing.T) {
r := setup(t)
tests := []struct {
queues []string
}{
{queues: []string{"default"}},
{queues: []string{"custom1", "custom2"}},
{queues: []string{"default", "custom1", "custom2"}},
{queues: []string{}},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
if err := r.client.SAdd(base.AllQueues, tc.queues...).Err(); err != nil {
t.Fatal("could not initialize all queue set")
}
got, err := r.AllQueues()
if err != nil {
t.Errorf("AllQueues() returned an error: %v", err)
continue
}
if diff := cmp.Diff(tc.queues, got, h.SortStringSliceOpt); diff != nil {
t.Errorf("AllQueues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
}
}
}
func TestCurrentStats(t *testing.T) {
r := setup(t)
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})