From a2296fd5cfc89f61272fbd9d25f57921c06292a7 Mon Sep 17 00:00:00 2001 From: Harrison Date: Thu, 31 Aug 2023 22:18:43 -0500 Subject: [PATCH] fixed trimmed archive tasks not being deleted. --- internal/rdb/rdb.go | 24 +++++++++++- internal/rdb/rdb_test.go | 80 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c79a9c5..2d293eb 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -829,6 +829,7 @@ const ( // KEYS[6] -> asynq:{}:failed: // KEYS[7] -> asynq:{}:processed // KEYS[8] -> asynq:{}:failed +// KEYS[9] -> asynq:{}:t: // ------- // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value @@ -845,8 +846,26 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) -redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) -redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) +local old = redis.call("ZRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) +if #old > 0 then + redis.log(redis.LOG_NOTICE, "archive: deleting old tasks", unpack(old)) + for _, id in ipairs(old) do + redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id) + redis.call("DEL", KEYS[9] .. id) + end + redis.call("ZREM", KEYS[4], unpack(old)) +end + +local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5]) +if #extra > 0 then + redis.log(redis.LOG_NOTICE, "archive: deleting extra tasks") + for _, id in ipairs(extra) do + redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id) + redis.call("DEL", KEYS[9] .. id) + end + redis.call("ZREM", KEYS[4], unpack(extra)) +end + redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then @@ -889,6 +908,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) base.FailedKey(msg.Queue, now), base.ProcessedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue), + base.TaskKeyPrefix(msg.Queue), } argv := []interface{}{ msg.ID, diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3bd9eda..5e5dea6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2171,6 +2171,86 @@ func TestArchive(t *testing.T) { } } +func TestArchiveTrim(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) + + errMsg := "SMTP server not responding" + + // create 10k archived tasks + taskCount := maxArchiveSize - 1 + archivedTasks := make([]base.Z, 0) + for i := 0; i < taskCount; i++ { + + id := uuid.NewString() + task := base.TaskMessage{ + ID: id, + Type: "send_email", + Payload: nil, + Queue: "default", + } + archivedTasks = append(archivedTasks, base.Z{ + Message: h.TaskMessageWithError(task, errMsg, now), + Score: now.Add(-1 * time.Hour).Unix(), + }) + } + + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{ + "default": archivedTasks, + }) + + archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default") + if len(archivedEntriesBefore) != taskCount { + t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1) + return + } + + // set up task that will cause archive queue to be trimmed + id := uuid.NewString() + target := &base.TaskMessage{ + ID: id, + Type: "send_email", + Payload: nil, + Queue: "default", + } + + h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{ + "default": {target}, + }) + h.SeedAllLease(t, r.client, map[string][]base.Z{ + "default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}}, + }) + + err := r.Archive(context.Background(), target, errMsg) + if err != nil { + t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) + return + } + + archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default") + if len(archivedEntriesInSet) != taskCount { + t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount) + return + } + + // check that the target task is where we expect it + newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message + if newestTask.ID != target.ID { + t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID) + return + } + + // now check if trim actually deleted the keys see if it's equal to taskCount + vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val() + if len(vals) != taskCount { + t.Errorf("len of keys = %v, want %v", len(vals), taskCount) + return + } +} + func TestForwardIfReadyWithGroup(t *testing.T) { r := setup(t) defer r.Close()