From e9239260ae25879752d3f15340aa7d54a66101df Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 27 Nov 2020 22:27:54 -0800 Subject: [PATCH] Add DeleteQueue method to Inspector - Added ErrQueueNotFound and ErrQueueNotEmpty type to indicate the kind of an error returned from the method. --- inspector.go | 39 ++++++ inspector_test.go | 235 ++++++++++++++++++++++++++++++++--- internal/rdb/inspect.go | 1 - internal/rdb/inspect_test.go | 4 +- 4 files changed, 259 insertions(+), 20 deletions(-) diff --git a/inspector.go b/inspector.go index e24f6c6..94a26b1 100644 --- a/inspector.go +++ b/inspector.go @@ -124,6 +124,45 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { return res, nil } +// ErrQueueNotFound indicates that the specified queue does not exist. +type ErrQueueNotFound struct { + qname string +} + +func (e *ErrQueueNotFound) Error() string { + return fmt.Sprintf("queue %q does not exist", e.qname) +} + +// ErrQueueNotEmpty indicates that the specified queue is not empty. +type ErrQueueNotEmpty struct { + qname string +} + +func (e *ErrQueueNotEmpty) Error() string { + return fmt.Sprintf("queue %q is not empty", e.qname) +} + +// DeleteQueue removes the specified queue. +// +// If force is set to true, DeleteQueue will remove the queue regardless of +// the queue size as long as no tasks are active in the queue. +// If force is set to false, DeleteQueue will remove the queue only if +// the queue is empty. +// +// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound. +// If force is set to false and the specified queue is not empty, DeleteQueue +// returns ErrQueueNotEmpty. +func (i *Inspector) DeleteQueue(qname string, force bool) error { + err := i.rdb.RemoveQueue(qname, force) + if _, ok := err.(*rdb.ErrQueueNotFound); ok { + return &ErrQueueNotFound{qname} + } + if _, ok := err.(*rdb.ErrQueueNotEmpty); ok { + return &ErrQueueNotEmpty{qname} + } + return err +} + // PendingTask is a task in a queue and is ready to be processed. type PendingTask struct { *Task diff --git a/inspector_test.go b/inspector_test.go index 214a96e..ff91d5f 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -50,6 +50,207 @@ func TestInspectorQueues(t *testing.T) { } +func TestInspectorDeleteQueue(t *testing.T) { + r := setup(t) + defer r.Close() + inspector := NewInspector(getRedisConnOpt(t)) + defer inspector.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + + tests := []struct { + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string // queue to remove + force bool + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + active: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + force: false, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + active: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: time.Now().Unix()}}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + force: true, // allow removing non-empty queue + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllDeadQueues(t, r, tc.dead) + + err := inspector.DeleteQueue(tc.qname, tc.force) + if err != nil { + t.Errorf("DeleteQueue(%q, %t) = %v, want nil", + tc.qname, tc.force, err) + continue + } + if r.SIsMember(base.AllQueues, tc.qname).Val() { + t.Errorf("%q is a member of %q", tc.qname, base.AllQueues) + } + } +} + +func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { + r := setup(t) + defer r.Close() + inspector := NewInspector(getRedisConnOpt(t)) + defer inspector.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + + tests := []struct { + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string // queue to remove + force bool + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + active: map[string][]*base.TaskMessage{ + "default": {m3, m4}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + force: false, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllDeadQueues(t, r, tc.dead) + + err := inspector.DeleteQueue(tc.qname, tc.force) + if _, ok := err.(*ErrQueueNotEmpty); !ok { + t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotEmpty", + tc.qname, tc.force) + } + } +} + +func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { + r := setup(t) + defer r.Close() + inspector := NewInspector(getRedisConnOpt(t)) + defer inspector.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + + tests := []struct { + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string // queue to remove + force bool + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + active: map[string][]*base.TaskMessage{ + "default": {m3, m4}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "nonexistent", + force: false, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllDeadQueues(t, r, tc.dead) + + err := inspector.DeleteQueue(tc.qname, tc.force) + if _, ok := err.(*ErrQueueNotFound); !ok { + t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotFound", + tc.qname, tc.force) + } + } +} + func TestInspectorCurrentStats(t *testing.T) { r := setup(t) defer r.Close() @@ -65,15 +266,15 @@ func TestInspectorCurrentStats(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - pending map[string][]*base.TaskMessage - inProgress map[string][]*base.TaskMessage - scheduled map[string][]base.Z - retry map[string][]base.Z - dead map[string][]base.Z - processed map[string]int - failed map[string]int - qname string - want *QueueStats + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + processed map[string]int + failed map[string]int + qname string + want *QueueStats }{ { pending: map[string][]*base.TaskMessage{ @@ -81,7 +282,7 @@ func TestInspectorCurrentStats(t *testing.T) { "critical": {m5}, "low": {m6}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {m2}, "critical": {}, "low": {}, @@ -134,7 +335,7 @@ func TestInspectorCurrentStats(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - asynqtest.SeedAllActiveQueues(t, r, tc.inProgress) + asynqtest.SeedAllActiveQueues(t, r, tc.active) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllRetryQueues(t, r, tc.retry) asynqtest.SeedAllDeadQueues(t, r, tc.dead) @@ -313,14 +514,14 @@ func TestInspectorListActiveTasks(t *testing.T) { } tests := []struct { - desc string - inProgress map[string][]*base.TaskMessage - qname string - want []*ActiveTask + desc string + active map[string][]*base.TaskMessage + qname string + want []*ActiveTask }{ { desc: "with a few active tasks", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3, m4}, }, @@ -334,7 +535,7 @@ func TestInspectorListActiveTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllActiveQueues(t, r, tc.inProgress) + asynqtest.SeedAllActiveQueues(t, r, tc.active) got, err := inspector.ListActiveTasks(tc.qname) if err != nil { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index daf1008..db62d44 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -745,7 +745,6 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { return err } } - return r.client.SRem(base.AllQueues, qname).Err() } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 5abcd78..fe831c4 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2707,10 +2707,10 @@ func TestRemoveQueue(t *testing.T) { err := r.RemoveQueue(tc.qname, tc.force) if err != nil { - t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err) + t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil", + tc.qname, tc.force, err) continue } - if r.client.SIsMember(base.AllQueues, tc.qname).Val() { t.Errorf("%q is a member of %q", tc.qname, base.AllQueues) }