diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 979d4af..155535c 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -707,14 +707,67 @@ func (r *RDB) DeleteAllScheduledTasks() error { return r.client.Del(base.ScheduledQueue).Err() } -// RemoveQueue removes the specified queue deleting any tasks in the queue. -func (r *RDB) RemoveQueue(qname string) error { - script := redis.NewScript(` - local n = redis.call("SREM", KEYS[1], KEYS[2]) - if n == 1 then - redis.call("DEL", KEYS[2]) - end - return redis.status_reply("OK") - `) - return script.Run(r.client, []string{base.AllQueues, base.QueueKey(qname)}).Err() +// ErrQueueNotFound indicates specified queue does not exist. +type ErrQueueNotFound struct { + qname string +} + +func (e *ErrQueueNotFound) Error() string { + return fmt.Sprintf("queue %q does not exist", e.qname) +} + +// ErrQueueNotEmpty indicates specified queue is not empty. +type ErrQueueNotEmpty struct { + qname string +} + +func (e *ErrQueueNotEmpty) Error() string { + return fmt.Sprintf("queue %q is not empty", e.qname) +} + +// RemoveQueue removes the specified queue. +// +// If force is set to true, it will remove the queue regardless +// of whether the queue is empty. +// If force is set to false, it will only remove the queue if +// it is empty. +func (r *RDB) RemoveQueue(qname string, force bool) error { + var script *redis.Script + if force { + script = redis.NewScript(` + local n = redis.call("SREM", KEYS[1], KEYS[2]) + if n == 0 then + return redis.error_reply("LIST NOT FOUND") + end + redis.call("DEL", KEYS[2]) + return redis.status_reply("OK") + `) + } else { + script = redis.NewScript(` + local l = redis.call("LLEN", KEYS[2]) + if l > 0 then + return redis.error_reply("LIST NOT EMPTY") + end + local n = redis.call("SREM", KEYS[1], KEYS[2]) + if n == 0 then + return redis.error_reply("LIST NOT FOUND") + end + redis.call("DEL", KEYS[2]) + return redis.status_reply("OK") + `) + } + err := script.Run(r.client, + []string{base.AllQueues, base.QueueKey(qname)}, + force).Err() + if err != nil { + switch err.Error() { + case "LIST NOT FOUND": + return &ErrQueueNotFound{qname} + case "LIST NOT EMPTY": + return &ErrQueueNotEmpty{qname} + default: + return err + } + } + return nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 28d8998..82cc52f 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1651,6 +1651,7 @@ func TestRemoveQueue(t *testing.T) { tests := []struct { enqueued map[string][]*base.TaskMessage qname string // queue to remove + force bool wantEnqueued map[string][]*base.TaskMessage }{ { @@ -1660,6 +1661,7 @@ func TestRemoveQueue(t *testing.T) { "low": {}, }, qname: "low", + force: false, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m1}, "critical": {m2, m3}, @@ -1672,6 +1674,7 @@ func TestRemoveQueue(t *testing.T) { "low": {}, }, qname: "critical", + force: true, // allow removing non-empty queue wantEnqueued: map[string][]*base.TaskMessage{ "default": {m1}, "low": {}, @@ -1685,7 +1688,7 @@ func TestRemoveQueue(t *testing.T) { h.SeedEnqueuedQueue(t, r.client, msgs, qname) } - err := r.RemoveQueue(tc.qname) + err := r.RemoveQueue(tc.qname, tc.force) if err != nil { t.Errorf("(*RDB).RemoveQueue(%q) = %v, want nil", tc.qname, err) continue @@ -1708,3 +1711,59 @@ func TestRemoveQueue(t *testing.T) { } } } + +func TestRemoveQueueError(t *testing.T) { + r := setup(t) + m1 := h.NewTaskMessage("send_email", nil) + m2 := h.NewTaskMessage("reindex", nil) + m3 := h.NewTaskMessage("gen_thumbnail", nil) + + tests := []struct { + desc string + enqueued map[string][]*base.TaskMessage + qname string // queue to remove + force bool + }{ + { + desc: "removing non-existent queue", + enqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "critical": {m2, m3}, + "low": {}, + }, + qname: "nonexistent", + force: false, + }, + { + desc: "removing non-empty queue", + enqueued: map[string][]*base.TaskMessage{ + "default": {m1}, + "critical": {m2, m3}, + "low": {}, + }, + qname: "critical", + force: false, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } + + got := r.RemoveQueue(tc.qname, tc.force) + if got == nil { + t.Errorf("%s;(*RDB).RemoveQueue(%q) = nil, want error", tc.desc, tc.qname) + continue + } + + // Make sure that nothing changed + for qname, want := range tc.enqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff) + } + } + } +} diff --git a/tools/asynqmon/cmd/rmq.go b/tools/asynqmon/cmd/rmq.go new file mode 100644 index 0000000..b8696be --- /dev/null +++ b/tools/asynqmon/cmd/rmq.go @@ -0,0 +1,52 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +// rmqCmd represents the rmq command +var rmqCmd = &cobra.Command{ + Use: "rmq [queue name]", + Short: "Removes the specified queue", + Long: `Rmq (asynqmon rmq) will remove the specified queue. +By default, it will remove the queue only if it's empty. +Use --force option to override this behavior. + +Example: asynqmon rmq low -> Removes "low" queue`, + Args: cobra.ExactValidArgs(1), + Run: rmq, +} + +var rmqForce bool + +func init() { + rootCmd.AddCommand(rmqCmd) + rmqCmd.Flags().BoolVarP(&rmqForce, "force", "f", false, "Remove the queue regardless of its size") +} + +func rmq(cmd *cobra.Command, args []string) { + c := redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + }) + r := rdb.NewRDB(c) + err := r.RemoveQueue(args[0], rmqForce) + if err != nil { + if _, ok := err.(*rdb.ErrQueueNotEmpty); ok { + fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynqmon rmq --force %s'\n", err, args[0]) + os.Exit(1) + } + fmt.Printf("error: %v", err) + os.Exit(1) + } + fmt.Printf("Successfully removed queue %q\n", args[0]) +}