From 290650ab2309146b04f7b6e7e4d9941d9624401e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 27 Feb 2021 19:56:57 -0800 Subject: [PATCH] Update RDB.Delete task methods --- internal/rdb/inspect.go | 92 +++++++++++++++---------------- internal/rdb/inspect_test.go | 101 +++++++++++++++++++++++++++-------- 2 files changed, 126 insertions(+), 67 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index e6fc62f..77cf262 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -721,68 +721,70 @@ func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) { // DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error { - return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score)) +func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error { + return r.deleteTask(base.ArchivedKey(qname), qname, id.String()) } // DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error { - return r.deleteTask(base.RetryKey(qname), id.String(), float64(score)) +func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error { + return r.deleteTask(base.RetryKey(qname), qname, id.String()) } // DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error { - return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score)) +func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error { + return r.deleteTask(base.ScheduledKey(qname), qname, id.String()) } +// KEYS[1] -> asynq:{}:pending +// KEYS[2] -> asynq:{}:t: +// ARGV[1] -> task ID +var deletePendingTaskCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return 0 +end +return redis.call("DEL", KEYS[2]) +`) + // DeletePendingTask deletes a pending tasks that matches the given id from the given queue. -// If a task that matches the id does not exist, it returns ErrTaskNotFound. +// If there's no match, it returns ErrTaskNotFound. func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { - qkey := base.PendingKey(qname) - data, err := r.client.LRange(qkey, 0, -1).Result() - if err != nil { - return err - } - for _, s := range data { - msg, err := base.DecodeMessage(s) - if err != nil { - return err - } - if msg.ID == id { - n, err := r.client.LRem(qkey, 1, s).Result() - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil - } - } - return ErrTaskNotFound -} - -var deleteTaskCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) -for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("ZREM", KEYS[1], msg) - return 1 - end -end -return 0`) - -func (r *RDB) deleteTask(key, id string, score float64) error { - res, err := deleteTaskCmd.Run(r.client, []string{key}, score, id).Result() + keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())} + res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result() if err != nil { return err } n, ok := res.(int64) if !ok { - return fmt.Errorf("could not cast %v to int64", res) + return fmt.Errorf("command error: unexpected return value %v", res) + } + if n == 0 { + return ErrTaskNotFound + } + return nil +} + +// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{}:retry) +// KEYS[2] -> asynq:{}:t: +// ARGV[1] -> task ID +var deleteTaskCmd = redis.NewScript(` +if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then + return 0 +end +return redis.call("DEL", KEYS[2]) +`) + +func (r *RDB) deleteTask(key, qname, id string) error { + keys := []string{key, base.TaskKey(qname, id)} + argv := []interface{}{id} + res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return fmt.Errorf("command error: unexpected return value %v", res) } if n == 0 { return ErrTaskNotFound diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index f8a0667..3e9596e 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2406,7 +2406,6 @@ func TestDeleteArchivedTask(t *testing.T) { archived map[string][]base.Z qname string id uuid.UUID - score int64 want error wantArchived map[string][]*base.TaskMessage }{ @@ -2419,7 +2418,6 @@ func TestDeleteArchivedTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: nil, wantArchived: map[string][]*base.TaskMessage{ "default": {m2}, @@ -2437,7 +2435,6 @@ func TestDeleteArchivedTask(t *testing.T) { }, qname: "custom", id: m3.ID, - score: t3.Unix(), want: nil, wantArchived: map[string][]*base.TaskMessage{ "default": {m1, m2}, @@ -2452,8 +2449,7 @@ func TestDeleteArchivedTask(t *testing.T) { }, }, qname: "default", - id: m1.ID, - score: t2.Unix(), // id and score mismatch + id: uuid.New(), want: ErrTaskNotFound, wantArchived: map[string][]*base.TaskMessage{ "default": {m1, m2}, @@ -2465,7 +2461,6 @@ func TestDeleteArchivedTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: ErrTaskNotFound, wantArchived: map[string][]*base.TaskMessage{ "default": {}, @@ -2477,9 +2472,9 @@ func TestDeleteArchivedTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.DeleteArchivedTask(tc.qname, tc.id, tc.score) + got := r.DeleteArchivedTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteArchivedTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2506,7 +2501,6 @@ func TestDeleteRetryTask(t *testing.T) { retry map[string][]base.Z qname string id uuid.UUID - score int64 want error wantRetry map[string][]*base.TaskMessage }{ @@ -2519,7 +2513,6 @@ func TestDeleteRetryTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: nil, wantRetry: map[string][]*base.TaskMessage{ "default": {m2}, @@ -2537,7 +2530,6 @@ func TestDeleteRetryTask(t *testing.T) { }, qname: "custom", id: m3.ID, - score: t3.Unix(), want: nil, wantRetry: map[string][]*base.TaskMessage{ "default": {m1, m2}, @@ -2549,8 +2541,7 @@ func TestDeleteRetryTask(t *testing.T) { "default": {{Message: m1, Score: t1.Unix()}}, }, qname: "default", - id: m2.ID, - score: t2.Unix(), + id: uuid.New(), want: ErrTaskNotFound, wantRetry: map[string][]*base.TaskMessage{ "default": {m1}, @@ -2562,9 +2553,9 @@ func TestDeleteRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) - got := r.DeleteRetryTask(tc.qname, tc.id, tc.score) + got := r.DeleteRetryTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.DeleteRetryTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteRetryTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2591,7 +2582,6 @@ func TestDeleteScheduledTask(t *testing.T) { scheduled map[string][]base.Z qname string id uuid.UUID - score int64 want error wantScheduled map[string][]*base.TaskMessage }{ @@ -2604,7 +2594,6 @@ func TestDeleteScheduledTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: nil, wantScheduled: map[string][]*base.TaskMessage{ "default": {m2}, @@ -2622,7 +2611,6 @@ func TestDeleteScheduledTask(t *testing.T) { }, qname: "custom", id: m3.ID, - score: t3.Unix(), want: nil, wantScheduled: map[string][]*base.TaskMessage{ "default": {m1, m2}, @@ -2634,8 +2622,7 @@ func TestDeleteScheduledTask(t *testing.T) { "default": {{Message: m1, Score: t1.Unix()}}, }, qname: "default", - id: m2.ID, - score: t2.Unix(), + id: uuid.New(), want: ErrTaskNotFound, wantScheduled: map[string][]*base.TaskMessage{ "default": {m1}, @@ -2647,9 +2634,9 @@ func TestDeleteScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.DeleteScheduledTask(tc.qname, tc.id, tc.score) + got := r.DeleteScheduledTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.DeleteScheduledTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteScheduledTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2662,6 +2649,76 @@ func TestDeleteScheduledTask(t *testing.T) { } } +func TestDeletePendingTask(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + + tests := []struct { + pending map[string][]*base.TaskMessage + qname string + id uuid.UUID + want error + wantPending map[string][]*base.TaskMessage + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + qname: "default", + id: m1.ID, + want: nil, + wantPending: map[string][]*base.TaskMessage{ + "default": {m2}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + qname: "custom", + id: m3.ID, + want: nil, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + qname: "default", + id: uuid.New(), + want: ErrTaskNotFound, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllPendingQueues(t, r.client, tc.pending) + + got := r.DeletePendingTask(tc.qname, tc.id) + if got != tc.want { + t.Errorf("r.DeletePendingTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) + continue + } + + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) + } + } + } +} + func TestDeleteAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close()