diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 042f574..ddc9c50 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -615,66 +615,84 @@ func (e *ErrQueueNotFound) Error() string { return fmt.Sprintf("queue %q does not exist", e.qname) } -// ErrQueueNotEmpty indicates specified queue is not empty. -type ErrQueueNotEmpty struct { - qname string -} - -func (e *ErrQueueNotEmpty) Error() string { - return fmt.Sprintf("queue %q is not empty", e.qname) -} - -/* -// Skip checking whether queue is empty before removing. +// Only check whether in-progress queue is empty before removing. +// KEYS[1] -> asynq:{} +// KEYS[2] -> asynq:{}:in_progress +// KEYS[3] -> asynq:{}:scheduled +// KEYS[4] -> asynq:{}:retry +// KEYS[5] -> asynq:{}:dead +// KEYS[6] -> asynq:{}:deadlines var removeQueueForceCmd = redis.NewScript(` -local n = redis.call("SREM", KEYS[1], KEYS[2]) -if n == 0 then - return redis.error_reply("LIST NOT FOUND") +local inprogress = redis.call("LLEN", KEYS[2]) +if inprogress > 0 then + return redis.error_reply("Queue has tasks in-progress") end +redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) +redis.call("DEL", KEYS[3]) +redis.call("DEL", KEYS[4]) +redis.call("DEL", KEYS[5]) +redis.call("DEL", KEYS[6]) return redis.status_reply("OK")`) // Checks whether queue is empty before removing. +// KEYS[1] -> asynq:{} +// KEYS[2] -> asynq:{}:in_progress +// KEYS[3] -> asynq:{}:scheduled +// KEYS[4] -> asynq:{}:retry +// KEYS[5] -> asynq:{}:dead +// KEYS[6] -> asynq:{}:deadlines var removeQueueCmd = redis.NewScript(` -local l = redis.call("LLEN", KEYS[2]) if l > 0 then - return redis.error_reply("LIST NOT EMPTY") -end -local n = redis.call("SREM", KEYS[1], KEYS[2]) -if n == 0 then - return redis.error_reply("LIST NOT FOUND") +local enqueued = redis.call("LLEN", KEYS[1]) +local inprogress = redis.call("LLEN", KEYS[2]) +local scheduled = redis.call("SCARD", KEYS[3]) +local retry = redis.call("SCARD", KEYS[4]) +local dead = redis.call("SCARD", KEYS[5]) +local total = enqueued + inprogress + scheduled + retry + dead +if total > 0 then + return redis.error_reply("Queue is not empty") end +redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) +redis.call("DEL", KEYS[3]) +redis.call("DEL", KEYS[4]) +redis.call("DEL", KEYS[5]) +redis.call("DEL", KEYS[6]) return redis.status_reply("OK")`) // RemoveQueue removes the specified queue. // // If force is set to true, it will remove the queue regardless -// of whether the queue is empty. +// as long as no tasks are in-progress for the queue. // If force is set to false, it will only remove the queue if -// it is empty. +// the queue is empty. func (r *RDB) RemoveQueue(qname string, force bool) error { + exists, err := r.client.SIsMember(base.AllQueues, qname).Result() + if err != nil { + return err + } + if !exists { + return &ErrQueueNotFound{qname} + } var script *redis.Script if force { script = removeQueueForceCmd } else { script = removeQueueCmd } - err := script.Run(r.client, - []string{base.AllQueues, base.QueueKey(qname)}, - force).Err() - if err != nil { - switch err.Error() { - case "LIST NOT FOUND": - return &ErrQueueNotFound{qname} - case "LIST NOT EMPTY": - return &ErrQueueNotEmpty{qname} - default: - return err - } + keys := []string{ + base.QueueKey(qname), + base.InProgressKey(qname), + base.ScheduledKey(qname), + base.RetryKey(qname), + base.DeadKey(qname), + base.DeadlinesKey(qname), } - return nil + if err := script.Run(r.client, keys).Err(); err != nil { + return err + } + return r.client.SRem(base.AllQueues, qname).Err() } -*/ // Note: Script also removes stale keys. var listServerKeysCmd = redis.NewScript(` diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 318bdd0..50985e6 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2591,52 +2591,79 @@ func TestDeleteAllScheduledTasks(t *testing.T) { } } -/* func TestRemoveQueue(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") tests := []struct { - enqueued map[string][]*base.TaskMessage - qname string // queue to remove - force bool - wantEnqueued map[string][]*base.TaskMessage + enqueued map[string][]*base.TaskMessage + inProgress map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string // queue to remove + force bool }{ { enqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2, m3}, - "low": {}, + "default": {m1, m2}, + "custom": {}, }, - qname: "low", + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", force: false, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2, m3}, - }, }, { enqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2, m3}, - "low": {}, + "default": {m1, m2}, + "custom": {m3}, }, - qname: "critical", + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: time.Now().Unix()}}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", force: true, // allow removing non-empty queue - wantEnqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "low": {}, - }, }, } for _, tc := range tests { h.FlushDB(t, r.client) - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, qname) - } + h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllRetryQueues(t, r.client, tc.retry) + h.SeedAllDeadQueues(t, r.client, tc.dead) err := r.RemoveQueue(tc.qname, tc.force) if err != nil { @@ -2644,19 +2671,21 @@ func TestRemoveQueue(t *testing.T) { continue } - qkey := base.QueueKey(tc.qname) - if r.client.SIsMember(base.AllQueues, qkey).Val() { - t.Errorf("%q is a member of %q", qkey, base.AllQueues) + if r.client.SIsMember(base.AllQueues, tc.qname).Val() { + t.Errorf("%q is a member of %q", tc.qname, base.AllQueues) } - if r.client.LLen(qkey).Val() != 0 { - t.Errorf("queue %q is not empty", qkey) + keys := []string{ + base.QueueKey(tc.qname), + base.InProgressKey(tc.qname), + base.DeadlinesKey(tc.qname), + base.ScheduledKey(tc.qname), + base.RetryKey(tc.qname), + base.DeadKey(tc.qname), } - - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got):\n%s", base.QueueKey(qname), diff) + for _, key := range keys { + if r.client.Exists(key).Val() != 0 { + t.Errorf("key %q still exists", key) } } } @@ -2664,22 +2693,42 @@ func TestRemoveQueue(t *testing.T) { func TestRemoveQueueError(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") tests := []struct { - desc string - enqueued map[string][]*base.TaskMessage - qname string // queue to remove - force bool + desc string + enqueued map[string][]*base.TaskMessage + inProgress map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string // queue to remove + force bool }{ { desc: "removing non-existent queue", enqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2, m3}, - "low": {}, + "default": {m1, m2}, + "custom": {m3}, + }, + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, }, qname: "nonexistent", force: false, @@ -2687,20 +2736,63 @@ func TestRemoveQueueError(t *testing.T) { { desc: "removing non-empty queue", enqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2, m3}, - "low": {}, + "default": {m1, m2}, + "custom": {m3}, }, - qname: "critical", + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: time.Now().Unix()}}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", force: false, }, + { + desc: "force removing queue with tasks in-progress", + enqueued: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m4}, + }, + scheduled: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + // Even with force=true, it should error if there are tasks in-progress. + force: true, + }, } for _, tc := range tests { h.FlushDB(t, r.client) - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r.client, msgs, qname) - } + h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllRetryQueues(t, r.client, tc.retry) + h.SeedAllDeadQueues(t, r.client, tc.dead) got := r.RemoveQueue(tc.qname, tc.force) if got == nil { @@ -2715,9 +2807,32 @@ func TestRemoveQueueError(t *testing.T) { t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff) } } + for qname, want := range tc.inProgress { + gotInProgress := h.GetInProgressMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.InProgressKey(qname), diff) + } + } + for qname, want := range tc.scheduled { + gotScheduled := h.GetScheduledEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ScheduledKey(qname), diff) + } + } + for qname, want := range tc.retry { + gotRetry := h.GetRetryEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.RetryKey(qname), diff) + } + } + for qname, want := range tc.dead { + gotDead := h.GetDeadEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.DeadKey(qname), diff) + } + } } } -*/ func TestListServers(t *testing.T) { r := setup(t)