diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a85a391..50d9c96 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -735,6 +735,11 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { if n == 0 { return ErrTaskNotFound } + if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() { + if err := r.client.Del(msg.UniqueKey).Err(); err != nil { + return err + } + } return nil } } @@ -747,6 +752,9 @@ for _, msg in ipairs(msgs) do local decoded = cjson.decode(msg) if decoded["ID"] == ARGV[2] then redis.call("ZREM", KEYS[1], msg) + if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then + redis.call("DEL", decoded["UniqueKey"]) + end return 1 end end @@ -769,9 +777,15 @@ func (r *RDB) deleteTask(key, id string, score float64) error { // KEYS[1] -> queue to delete var deleteAllCmd = redis.NewScript(` -local n = redis.call("ZCARD", KEYS[1]) +local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then + redis.call("DEL", decoded["UniqueKey"]) + end +end redis.call("DEL", KEYS[1]) -return n`) +return table.getn(msgs)`) // DeleteAllArchivedTasks deletes all archived tasks from the given queue // and returns the number of tasks deleted. @@ -805,9 +819,15 @@ func (r *RDB) deleteAll(key string) (int64, error) { // KEYS[1] -> asynq:{} var deleteAllPendingCmd = redis.NewScript(` -local n = redis.call("LLEN", KEYS[1]) +local msgs = redis.call("LRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then + redis.call("DEL", decoded["UniqueKey"]) + end +end redis.call("DEL", KEYS[1]) -return n`) +return table.getn(msgs)`) // DeleteAllPendingTasks deletes all pending tasks from the given queue // and returns the number of tasks deleted. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0e278ef..723424d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2445,7 +2445,68 @@ func TestDeleteScheduledTask(t *testing.T) { } } -func TestDeleteAllDeadTasks(t *testing.T) { +func TestDeleteUniqueTask(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := &base.TaskMessage{ + ID: uuid.New(), + Type: "reindex", + Payload: nil, + Timeout: 1800, + Deadline: 0, + UniqueKey: "asynq:{default}:unique:reindex:nil", + Queue: "default", + } + t1 := time.Now().Add(5 * time.Minute) + + tests := []struct { + scheduled map[string][]base.Z + qname string + id uuid.UUID + score int64 + uniqueKey string + wantScheduled map[string][]*base.TaskMessage + }{ + { + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + }, + }, + qname: "default", + id: m1.ID, + score: t1.Unix(), + uniqueKey: m1.UniqueKey, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + if err := r.client.SetNX(tc.uniqueKey, tc.id.String(), time.Minute).Err(); err != nil { + t.Fatalf("Could not set unique lock in redis: %v", err) + } + + if err := r.DeleteScheduledTask(tc.qname, tc.id, tc.score); err != nil { + t.Errorf("r.DeleteScheduledTask(%q, %v, %v) returned error: %v", tc.qname, tc.id, tc.score, err) + continue + } + + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } + } + if r.client.Exists(tc.uniqueKey).Val() != 0 { + t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey) + } + } +} +func TestDeleteAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2507,6 +2568,89 @@ func TestDeleteAllDeadTasks(t *testing.T) { } } +func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := &base.TaskMessage{ + ID: uuid.New(), + Type: "task1", + Payload: nil, + Timeout: 1800, + Deadline: 0, + UniqueKey: "asynq:{default}:unique:task1:nil", + Queue: "default", + } + m2 := &base.TaskMessage{ + ID: uuid.New(), + Type: "task2", + Payload: nil, + Timeout: 1800, + Deadline: 0, + UniqueKey: "asynq:{default}:unique:task2:nil", + Queue: "default", + } + m3 := h.NewTaskMessage("task3", nil) + + tests := []struct { + archived map[string][]base.Z + qname string + want int64 + wantArchived map[string][]*base.TaskMessage + }{ + { + archived: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + {Message: m3, Score: time.Now().Unix()}, + }, + }, + qname: "default", + want: 3, + wantArchived: map[string][]*base.TaskMessage{ + "default": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllArchivedQueues(t, r.client, tc.archived) + var uniqueKeys []string // list of unique keys set in redis + for _, zs := range tc.archived { + for _, z := range zs { + if len(z.Message.UniqueKey) > 0 { + err := r.client.SetNX(z.Message.UniqueKey, z.Message.ID.String(), time.Minute).Err() + if err != nil { + t.Fatalf("Failed to set unique lock in redis: %v", err) + } + uniqueKeys = append(uniqueKeys, z.Message.UniqueKey) + } + } + } + + got, err := r.DeleteAllArchivedTasks(tc.qname) + if err != nil { + t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) + } + if got != tc.want { + t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) + } + for qname, want := range tc.wantArchived { + gotArchived := h.GetArchivedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff) + } + } + + for _, uniqueKey := range uniqueKeys { + if r.client.Exists(uniqueKey).Val() != 0 { + t.Errorf("Uniqueness lock %q still exists", uniqueKey) + } + } + } +} + func TestDeleteAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close()