mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 06:42:16 +08:00
Add RDB.RemoveQueue method
This commit is contained in:
parent
84eef4ed0b
commit
874d8e8843
@ -706,3 +706,15 @@ func (r *RDB) DeleteAllRetryTasks() error {
|
||||
func (r *RDB) DeleteAllScheduledTasks() error {
|
||||
return r.client.Del(base.ScheduledQueue).Err()
|
||||
}
|
||||
|
||||
// RemoveQueue removes the specified queue deleting any tasks in the queue.
|
||||
func (r *RDB) RemoveQueue(qname string) error {
|
||||
script := redis.NewScript(`
|
||||
local n = redis.call("SREM", KEYS[1], KEYS[2])
|
||||
if n == 1 then
|
||||
redis.call("DEL", KEYS[2])
|
||||
end
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
return script.Run(r.client, []string{base.AllQueues, base.QueueKey(qname)}).Err()
|
||||
}
|
||||
|
@ -1641,3 +1641,70 @@ func TestDeleteAllScheduledTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveQueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := h.NewTaskMessage("send_email", nil)
|
||||
m2 := h.NewTaskMessage("reindex", nil)
|
||||
m3 := h.NewTaskMessage("gen_thumbnail", nil)
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
qname string // queue to remove
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m2, m3},
|
||||
"low": {},
|
||||
},
|
||||
qname: "low",
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m2, m3},
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m2, m3},
|
||||
"low": {},
|
||||
},
|
||||
qname: "critical",
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"low": {},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
for qname, msgs := range tc.enqueued {
|
||||
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
|
||||
}
|
||||
|
||||
err := r.RemoveQueue(tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err)
|
||||
continue
|
||||
}
|
||||
|
||||
qkey := base.QueueKey(tc.qname)
|
||||
if r.client.SIsMember(base.AllQueues, qkey).Val() {
|
||||
t.Errorf("%q is a member of %q", qkey, base.AllQueues)
|
||||
}
|
||||
|
||||
if r.client.LLen(qkey).Val() != 0 {
|
||||
t.Errorf("queue %q is not empty", qkey)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want,+got):\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ func uniq(names []string, l int) []string {
|
||||
return res
|
||||
}
|
||||
|
||||
// sortByPriority returns the list of queue names sorted by
|
||||
// sortByPriority returns a list of queue names sorted by
|
||||
// their priority level in descending order.
|
||||
func sortByPriority(qcfg map[string]uint) []string {
|
||||
var queues []*queue
|
||||
|
Loading…
Reference in New Issue
Block a user