2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

Update RDB.DeleteAll methods

This commit is contained in:
Ken Hibino 2021-02-27 22:24:15 -08:00
parent 290650ab23
commit de226f3654
2 changed files with 80 additions and 12 deletions

View File

@ -793,31 +793,36 @@ func (r *RDB) deleteTask(key, qname, id string) error {
} }
// KEYS[1] -> queue to delete // KEYS[1] -> queue to delete
// ARGV[1] -> task key prefix
var deleteAllCmd = redis.NewScript(` var deleteAllCmd = redis.NewScript(`
local n = redis.call("ZCARD", KEYS[1]) local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
return n`) return table.getn(ids)`)
// DeleteAllArchivedTasks deletes all archived tasks from the given queue // DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.
func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) { func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) {
return r.deleteAll(base.ArchivedKey(qname)) return r.deleteAll(base.ArchivedKey(qname), qname)
} }
// DeleteAllRetryTasks deletes all retry tasks from the given queue // DeleteAllRetryTasks deletes all retry tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.
func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) { func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) {
return r.deleteAll(base.RetryKey(qname)) return r.deleteAll(base.RetryKey(qname), qname)
} }
// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue // DeleteAllScheduledTasks deletes all scheduled tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.
func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) {
return r.deleteAll(base.ScheduledKey(qname)) return r.deleteAll(base.ScheduledKey(qname), qname)
} }
func (r *RDB) deleteAll(key string) (int64, error) { func (r *RDB) deleteAll(key, qname string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result() res, err := deleteAllCmd.Run(r.client, []string{key}, base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -828,22 +833,28 @@ func (r *RDB) deleteAll(key string) (int64, error) {
return n, nil return n, nil
} }
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}:pending
// ARGV[1] -> task key prefix
var deleteAllPendingCmd = redis.NewScript(` var deleteAllPendingCmd = redis.NewScript(`
local n = redis.call("LLEN", KEYS[1]) local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
return n`) return table.getn(ids)`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue // DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted. // and returns the number of tasks deleted.
func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
res, err := deleteAllPendingCmd.Run(r.client, []string{base.PendingKey(qname)}).Result() res, err := deleteAllPendingCmd.Run(r.client,
[]string{base.PendingKey(qname)}, base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
n, ok := res.(int64) n, ok := res.(int64)
if !ok { if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res) return 0, fmt.Errorf("command error: unexpected return value %v", res)
} }
return n, nil return n, nil
} }

View File

@ -2905,6 +2905,63 @@ func TestDeleteAllScheduledTasks(t *testing.T) {
} }
} }
func TestDeleteAllPendingTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
qname string
want int64
wantPending map[string][]*base.TaskMessage
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
qname: "default",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m3},
},
},
{
pending: map[string][]*base.TaskMessage{
"custom": {},
},
qname: "custom",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
got, err := r.DeleteAllPendingTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllPendingTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllPendingTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
}
}
func TestRemoveQueue(t *testing.T) { func TestRemoveQueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()