From 874d8e8843adec60e902b592f16254bde7e6c318 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 13 Jan 2020 06:50:03 -0800 Subject: [PATCH] Add RDB.RemoveQueue method --- internal/rdb/inspect.go | 12 +++++++ internal/rdb/inspect_test.go | 67 ++++++++++++++++++++++++++++++++++++ processor.go | 2 +- 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index f279254..979d4af 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -706,3 +706,15 @@ func (r *RDB) DeleteAllRetryTasks() error { func (r *RDB) DeleteAllScheduledTasks() error { return r.client.Del(base.ScheduledQueue).Err() } + +// RemoveQueue removes the specified queue deleting any tasks in the queue. +func (r *RDB) RemoveQueue(qname string) error { + script := redis.NewScript(` + local n = redis.call("SREM", KEYS[1], KEYS[2]) + if n == 1 then + redis.call("DEL", KEYS[2]) + end + return redis.status_reply("OK") + `) + return script.Run(r.client, []string{base.AllQueues, base.QueueKey(qname)}).Err() +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 278b608..28d8998 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1641,3 +1641,70 @@ func TestDeleteAllScheduledTasks(t *testing.T) { } } } + +func TestRemoveQueue(t *testing.T) { + r := setup(t) + m1 := h.NewTaskMessage("send_email", nil) + m2 := h.NewTaskMessage("reindex", nil) + m3 := h.NewTaskMessage("gen_thumbnail", nil) + + tests := []struct { + enqueued map[string][]*base.TaskMessage + qname string // queue to remove + wantEnqueued map[string][]*base.TaskMessage + }{ + { + enqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "critical": {m2, m3}, + "low": {}, + }, + qname: "low", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "critical": {m2, m3}, + }, + }, + { + enqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "critical": {m2, m3}, + "low": {}, + }, + qname: "critical", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "low": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } + + err := r.RemoveQueue(tc.qname) + if err != nil { + t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err) + continue + } + + qkey := base.QueueKey(tc.qname) + if r.client.SIsMember(base.AllQueues, qkey).Val() { + t.Errorf("%q is a member of %q", qkey, base.AllQueues) + } + + if r.client.LLen(qkey).Val() != 0 { + t.Errorf("queue %q is not empty", qkey) + } + + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got):\n%s", base.QueueKey(qname), diff) + } + } + } +} diff --git a/processor.go b/processor.go index 5f2fa57..577d4b4 100644 --- a/processor.go +++ b/processor.go @@ -264,7 +264,7 @@ func uniq(names []string, l int) []string { return res } -// sortByPriority returns the list of queue names sorted by +// sortByPriority returns a list of queue names sorted by // their priority level in descending order. func sortByPriority(qcfg map[string]uint) []string { var queues []*queue