2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Add DeleteQueue method to Inspector

- Added ErrQueueNotFound and ErrQueueNotEmpty type to indicate the kind
  of an error returned from the method.
This commit is contained in:
Ken Hibino 2020-11-27 22:27:54 -08:00
parent 8f9d5a3352
commit e9239260ae
4 changed files with 259 additions and 20 deletions

View File

@ -124,6 +124,45 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
return res, nil
}
// ErrQueueNotFound indicates that the specified queue does not exist.
type ErrQueueNotFound struct {
qname string
}
func (e *ErrQueueNotFound) Error() string {
return fmt.Sprintf("queue %q does not exist", e.qname)
}
// ErrQueueNotEmpty indicates that the specified queue is not empty.
type ErrQueueNotEmpty struct {
qname string
}
func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
// DeleteQueue removes the specified queue.
//
// If force is set to true, DeleteQueue will remove the queue regardless of
// the queue size as long as no tasks are active in the queue.
// If force is set to false, DeleteQueue will remove the queue only if
// the queue is empty.
//
// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
// If force is set to false and the specified queue is not empty, DeleteQueue
// returns ErrQueueNotEmpty.
func (i *Inspector) DeleteQueue(qname string, force bool) error {
err := i.rdb.RemoveQueue(qname, force)
if _, ok := err.(*rdb.ErrQueueNotFound); ok {
return &ErrQueueNotFound{qname}
}
if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
return &ErrQueueNotEmpty{qname}
}
return err
}
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*Task

View File

@ -50,6 +50,207 @@ func TestInspectorQueues(t *testing.T) {
}
func TestInspectorDeleteQueue(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
qname string // queue to remove
force bool
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
active: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
force: false,
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
active: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {{Message: m4, Score: time.Now().Unix()}},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
force: true, // allow removing non-empty queue
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllPendingQueues(t, r, tc.pending)
h.SeedAllActiveQueues(t, r, tc.active)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllDeadQueues(t, r, tc.dead)
err := inspector.DeleteQueue(tc.qname, tc.force)
if err != nil {
t.Errorf("DeleteQueue(%q, %t) = %v, want nil",
tc.qname, tc.force, err)
continue
}
if r.SIsMember(base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
}
}
}
func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
qname string // queue to remove
force bool
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
active: map[string][]*base.TaskMessage{
"default": {m3, m4},
},
scheduled: map[string][]base.Z{
"default": {},
},
retry: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
force: false,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllPendingQueues(t, r, tc.pending)
h.SeedAllActiveQueues(t, r, tc.active)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllDeadQueues(t, r, tc.dead)
err := inspector.DeleteQueue(tc.qname, tc.force)
if _, ok := err.(*ErrQueueNotEmpty); !ok {
t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotEmpty",
tc.qname, tc.force)
}
}
}
func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
qname string // queue to remove
force bool
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
active: map[string][]*base.TaskMessage{
"default": {m3, m4},
},
scheduled: map[string][]base.Z{
"default": {},
},
retry: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "nonexistent",
force: false,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllPendingQueues(t, r, tc.pending)
h.SeedAllActiveQueues(t, r, tc.active)
h.SeedAllScheduledQueues(t, r, tc.scheduled)
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllDeadQueues(t, r, tc.dead)
err := inspector.DeleteQueue(tc.qname, tc.force)
if _, ok := err.(*ErrQueueNotFound); !ok {
t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotFound",
tc.qname, tc.force)
}
}
}
func TestInspectorCurrentStats(t *testing.T) {
r := setup(t)
defer r.Close()
@ -65,15 +266,15 @@ func TestInspectorCurrentStats(t *testing.T) {
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
processed map[string]int
failed map[string]int
qname string
want *QueueStats
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
processed map[string]int
failed map[string]int
qname string
want *QueueStats
}{
{
pending: map[string][]*base.TaskMessage{
@ -81,7 +282,7 @@ func TestInspectorCurrentStats(t *testing.T) {
"critical": {m5},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
@ -134,7 +335,7 @@ func TestInspectorCurrentStats(t *testing.T) {
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
asynqtest.SeedAllActiveQueues(t, r, tc.inProgress)
asynqtest.SeedAllActiveQueues(t, r, tc.active)
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
@ -313,14 +514,14 @@ func TestInspectorListActiveTasks(t *testing.T) {
}
tests := []struct {
desc string
inProgress map[string][]*base.TaskMessage
qname string
want []*ActiveTask
desc string
active map[string][]*base.TaskMessage
qname string
want []*ActiveTask
}{
{
desc: "with a few active tasks",
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
@ -334,7 +535,7 @@ func TestInspectorListActiveTasks(t *testing.T) {
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllActiveQueues(t, r, tc.inProgress)
asynqtest.SeedAllActiveQueues(t, r, tc.active)
got, err := inspector.ListActiveTasks(tc.qname)
if err != nil {

View File

@ -745,7 +745,6 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
return err
}
}
return r.client.SRem(base.AllQueues, qname).Err()
}

View File

@ -2707,10 +2707,10 @@ func TestRemoveQueue(t *testing.T) {
err := r.RemoveQueue(tc.qname, tc.force)
if err != nil {
t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err)
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil",
tc.qname, tc.force, err)
continue
}
if r.client.SIsMember(base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
}