2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-03 05:12:01 +08:00

Add rmq command to asynqmon

This commit is contained in:
Ken Hibino
2020-01-13 07:03:07 -08:00
parent 874d8e8843
commit 858b0325bd
3 changed files with 175 additions and 11 deletions

View File

@@ -707,14 +707,67 @@ func (r *RDB) DeleteAllScheduledTasks() error {
return r.client.Del(base.ScheduledQueue).Err()
}
// RemoveQueue removes the specified queue deleting any tasks in the queue.
func (r *RDB) RemoveQueue(qname string) error {
script := redis.NewScript(`
local n = redis.call("SREM", KEYS[1], KEYS[2])
if n == 1 then
redis.call("DEL", KEYS[2])
end
return redis.status_reply("OK")
`)
return script.Run(r.client, []string{base.AllQueues, base.QueueKey(qname)}).Err()
// ErrQueueNotFound indicates specified queue does not exist.
type ErrQueueNotFound struct {
qname string
}
func (e *ErrQueueNotFound) Error() string {
return fmt.Sprintf("queue %q does not exist", e.qname)
}
// ErrQueueNotEmpty indicates specified queue is not empty.
type ErrQueueNotEmpty struct {
qname string
}
func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
// RemoveQueue removes the specified queue.
//
// If force is set to true, it will remove the queue regardless
// of whether the queue is empty.
// If force is set to false, it will only remove the queue if
// it is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error {
var script *redis.Script
if force {
script = redis.NewScript(`
local n = redis.call("SREM", KEYS[1], KEYS[2])
if n == 0 then
return redis.error_reply("LIST NOT FOUND")
end
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
} else {
script = redis.NewScript(`
local l = redis.call("LLEN", KEYS[2])
if l > 0 then
return redis.error_reply("LIST NOT EMPTY")
end
local n = redis.call("SREM", KEYS[1], KEYS[2])
if n == 0 then
return redis.error_reply("LIST NOT FOUND")
end
redis.call("DEL", KEYS[2])
return redis.status_reply("OK")
`)
}
err := script.Run(r.client,
[]string{base.AllQueues, base.QueueKey(qname)},
force).Err()
if err != nil {
switch err.Error() {
case "LIST NOT FOUND":
return &ErrQueueNotFound{qname}
case "LIST NOT EMPTY":
return &ErrQueueNotEmpty{qname}
default:
return err
}
}
return nil
}

View File

@@ -1651,6 +1651,7 @@ func TestRemoveQueue(t *testing.T) {
tests := []struct {
enqueued map[string][]*base.TaskMessage
qname string // queue to remove
force bool
wantEnqueued map[string][]*base.TaskMessage
}{
{
@@ -1660,6 +1661,7 @@ func TestRemoveQueue(t *testing.T) {
"low": {},
},
qname: "low",
force: false,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m2, m3},
@@ -1672,6 +1674,7 @@ func TestRemoveQueue(t *testing.T) {
"low": {},
},
qname: "critical",
force: true, // allow removing non-empty queue
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m1},
"low": {},
@@ -1685,7 +1688,7 @@ func TestRemoveQueue(t *testing.T) {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
err := r.RemoveQueue(tc.qname)
err := r.RemoveQueue(tc.qname, tc.force)
if err != nil {
t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err)
continue
@@ -1708,3 +1711,59 @@ func TestRemoveQueue(t *testing.T) {
}
}
}
func TestRemoveQueueError(t *testing.T) {
r := setup(t)
m1 := h.NewTaskMessage("send_email", nil)
m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", nil)
tests := []struct {
desc string
enqueued map[string][]*base.TaskMessage
qname string // queue to remove
force bool
}{
{
desc: "removing non-existent queue",
enqueued: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m2, m3},
"low": {},
},
qname: "nonexistent",
force: false,
},
{
desc: "removing non-empty queue",
enqueued: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m2, m3},
"low": {},
},
qname: "critical",
force: false,
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
}
got := r.RemoveQueue(tc.qname, tc.force)
if got == nil {
t.Errorf("%s;(*RDB).RemoveQueue(%q) = nil, want error", tc.desc, tc.qname)
continue
}
// Make sure that nothing changed
for qname, want := range tc.enqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
}
}