2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Add support for delete and archive action on PendingTask

This commit is contained in:
Ken Hibino 2021-01-18 07:22:26 -08:00
parent 2884044e75
commit af920be7ee
3 changed files with 260 additions and 14 deletions

View File

@ -217,17 +217,24 @@ type ArchivedTask struct {
score int64
}
// Key returns a key used to delete, run, and archive the task.
// Key returns a key used to delete, and archive the pending task.
func (t *PendingTask) Key() string {
// Note: Pending tasks are stored in redis LIST, therefore no score.
// Use zero for the score to preserve the same key format.
return fmt.Sprintf("p:%v:%v", t.ID, 0)
}
// Key returns a key used to delete, run, and archive the scheduled task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, run, and archive the task.
// Key returns a key used to delete, run, and archive the retry task.
func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, run, and archive the task.
// Key returns a key used to delete and run the archived task.
func (t *ArchivedTask) Key() string {
return fmt.Sprintf("a:%v:%v", t.ID, t.score)
}
@ -248,7 +255,7 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err erro
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
state = parts[0]
if len(state) != 1 || !strings.Contains("sra", state) {
if len(state) != 1 || !strings.Contains("psra", state) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, state, nil
@ -497,6 +504,8 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
return err
}
switch state {
case "p":
return i.rdb.DeletePendingTask(qname, id)
case "s":
return i.rdb.DeleteScheduledTask(qname, id, score)
case "r":
@ -589,6 +598,8 @@ func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
return err
}
switch state {
case "p":
return i.rdb.ArchivePendingTask(qname, id)
case "s":
return i.rdb.ArchiveScheduledTask(qname, id, score)
case "r":

View File

@ -1037,7 +1037,7 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) {
}
}
func TestInspectorKillAllScheduledTasks(t *testing.T) {
func TestInspectorArchiveAllScheduledTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
@ -1145,11 +1145,11 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) {
got, err := inspector.ArchiveAllScheduledTasks(tc.qname)
if err != nil {
t.Errorf("KillAllScheduledTasks(%q) returned error: %v", tc.qname, err)
t.Errorf("ArchiveAllScheduledTasks(%q) returned error: %v", tc.qname, err)
continue
}
if got != tc.want {
t.Errorf("KillAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want)
t.Errorf("ArchiveAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want)
}
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
@ -1170,7 +1170,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) {
}
}
func TestInspectorKillAllRetryTasks(t *testing.T) {
func TestInspectorArchiveAllRetryTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
@ -1262,11 +1262,11 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
got, err := inspector.ArchiveAllRetryTasks(tc.qname)
if err != nil {
t.Errorf("KillAllRetryTasks(%q) returned error: %v", tc.qname, err)
t.Errorf("ArchiveAllRetryTasks(%q) returned error: %v", tc.qname, err)
continue
}
if got != tc.want {
t.Errorf("KillAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want)
t.Errorf("ArchiveAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want)
}
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
@ -1632,6 +1632,67 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
}
}
func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
qname string
key string
wantPending map[string][]*base.TaskMessage
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
qname: "default",
key: createPendingTask(m2).Key(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m3},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
qname: "custom",
key: createPendingTask(m3).Key(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v",
tc.qname, tc.key, err)
continue
}
for qname, want := range tc.wantPending {
got := asynqtest.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, asynqtest.SortMsgOpt); diff != "" {
t.Errorf("unspected pending tasks in queue %q: (-want,+got):\n%s",
qname, diff)
continue
}
}
}
}
func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
@ -1994,7 +2055,98 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
}
}
func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := NewInspector(getRedisConnOpt(t))
now := time.Now()
tests := []struct {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
key string
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m2, m3},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "default",
key: createPendingTask(m1).Key(),
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m2, m3},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: now.Unix()},
},
"custom": {},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m2, m3},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
key: createPendingTask(m2).Key(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m3},
},
wantArchived: map[string][]base.Z{
"default": {},
"custom": {
{Message: m2, Score: now.Unix()},
},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
asynqtest.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v",
tc.qname, tc.key, err)
continue
}
for qname, want := range tc.wantPending {
gotPending := asynqtest.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
t.Errorf("unexpected pending tasks in queue %q: (-want,+got)\n%s",
qname, diff)
}
}
for qname, want := range tc.wantArchived {
wantArchived := asynqtest.GetArchivedEntries(t, r, qname)
if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected archived tasks in queue %q: (-want,+got)\n%s",
qname, diff)
}
}
}
}
func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
@ -2049,7 +2201,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
asynqtest.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
continue
}
for qname, want := range tc.wantScheduled {
@ -2070,7 +2222,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
}
}
func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := asynqtest.NewTaskMessage("task1", nil)
@ -2124,7 +2276,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
asynqtest.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
continue
}
for qname, want := range tc.wantRetry {

View File

@ -488,6 +488,66 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) erro
return nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> task message to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archive (e.g., 100)
var archivePendingCmd = redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 1, ARGV[1])
if x == 0 then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) archivePending(qname, msg string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
res, err := archivePendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.archivePending(qname, s)
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
}
return ErrTaskNotFound
}
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
// returns the number of tasks that were moved.
func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) {
@ -585,6 +645,29 @@ func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error
return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score))
}
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
if err := r.client.LRem(qkey, 1, s).Err(); err != nil {
return err
}
return nil
}
}
return ErrTaskNotFound
}
var deleteTaskCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do