From 39b01d3563bee8b235fab697f31145556a9a3855 Mon Sep 17 00:00:00 2001 From: Harrison Date: Tue, 19 Sep 2023 14:56:41 -0500 Subject: [PATCH] improved test case. --- internal/rdb/rdb.go | 4 - internal/rdb/rdb_test.go | 200 +++++++++++++++++++++++++++------------ 2 files changed, 138 insertions(+), 66 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 2d293eb..8b75f72 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -848,9 +848,7 @@ end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) local old = redis.call("ZRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) if #old > 0 then - redis.log(redis.LOG_NOTICE, "archive: deleting old tasks", unpack(old)) for _, id in ipairs(old) do - redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id) redis.call("DEL", KEYS[9] .. id) end redis.call("ZREM", KEYS[4], unpack(old)) @@ -858,9 +856,7 @@ end local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5]) if #extra > 0 then - redis.log(redis.LOG_NOTICE, "archive: deleting extra tasks") for _, id in ipairs(extra) do - redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id) redis.call("DEL", KEYS[9] .. id) end redis.call("ZREM", KEYS[4], unpack(extra)) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5e5dea6..5ed612e 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2002,7 +2002,6 @@ func TestArchive(t *testing.T) { } errMsg := "SMTP server not responding" - // TODO(hibiken): add test cases for trimming tests := []struct { active map[string][]*base.TaskMessage lease map[string][]base.Z @@ -2177,77 +2176,154 @@ func TestArchiveTrim(t *testing.T) { now := time.Now() r.SetClock(timeutil.NewSimulatedClock(now)) - errMsg := "SMTP server not responding" - - // create 10k archived tasks - taskCount := maxArchiveSize - 1 - archivedTasks := make([]base.Z, 0) - for i := 0; i < taskCount; i++ { - - id := uuid.NewString() - task := base.TaskMessage{ - ID: id, - Type: "send_email", - Payload: nil, - Queue: "default", - } - archivedTasks = append(archivedTasks, base.Z{ - Message: h.TaskMessageWithError(task, errMsg, now), - Score: now.Add(-1 * time.Hour).Unix(), - }) - } - - h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{ - "default": archivedTasks, - }) - - archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default") - if len(archivedEntriesBefore) != taskCount { - t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1) - return - } - - // set up task that will cause archive queue to be trimmed - id := uuid.NewString() - target := &base.TaskMessage{ - ID: id, + t1 := &base.TaskMessage{ + ID: uuid.NewString(), Type: "send_email", Payload: nil, Queue: "default", + Retry: 25, + Retried: 25, + Timeout: 1800, + } + t2 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "reindex", + Payload: nil, + Queue: "default", + Retry: 25, + Retried: 0, + Timeout: 3000, + } + errMsg := "SMTP server not responding" + + maxArchiveSet := make([]base.Z, 0) + for i := 0; i < maxArchiveSize-1; i++ { + maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{ + ID: uuid.NewString(), + Type: "generate_csv", + Payload: nil, + Queue: "default", + Retry: 25, + Retried: 0, + Timeout: 60, + }, Score: now.Add(-time.Hour + -time.Second*time.Duration(i)).Unix()}) } - h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{ - "default": {target}, - }) - h.SeedAllLease(t, r.client, map[string][]base.Z{ - "default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}}, - }) + wantMaxArchiveSet := make([]base.Z, 0) + // newly archived task should be at the front + wantMaxArchiveSet = append(wantMaxArchiveSet, base.Z{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}) + // oldest task should be dropped from the set + wantMaxArchiveSet = append(wantMaxArchiveSet, maxArchiveSet[:len(maxArchiveSet)-1]...) - err := r.Archive(context.Background(), target, errMsg) - if err != nil { - t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) - return + tests := []struct { + toArchive map[string][]*base.TaskMessage + lease map[string][]base.Z + archived map[string][]base.Z + wantArchived map[string][]base.Z + }{ + { // simple, 1 to be archived, 1 already archived, both are in the archive set + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": { + {Message: t2, Score: now.Add(-time.Hour).Unix()}, + }, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + {Message: t2, Score: now.Add(-time.Hour).Unix()}, + }, + }, + }, + { // 1 to be archived, 1 already archived but past expiry, only the newly archived task should be left + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": { + {Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()}, + }, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()}, + }, + }, + }, + { // 1 to be archived, maxArchiveSize in archive set, archive set should be trimmed back to maxArchiveSize and newly archived task should be in the set + toArchive: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + archived: map[string][]base.Z{ + "default": maxArchiveSet, + }, + wantArchived: map[string][]base.Z{ + "default": wantMaxArchiveSet, + }, + }, } - archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default") - if len(archivedEntriesInSet) != taskCount { - t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount) - return - } + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllActiveQueues(t, r.client, tc.toArchive) + h.SeedAllLease(t, r.client, tc.lease) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - // check that the target task is where we expect it - newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message - if newestTask.ID != target.ID { - t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID) - return - } + for _, tasks := range tc.toArchive { + for _, target := range tasks { + err := r.Archive(context.Background(), target, errMsg) + if err != nil { + t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) + continue + } + } + } - // now check if trim actually deleted the keys see if it's equal to taskCount - vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val() - if len(vals) != taskCount { - t.Errorf("len of keys = %v, want %v", len(vals), taskCount) - return + for queue, want := range tc.wantArchived { + gotArchived := h.GetArchivedEntries(t, r.client, queue) + + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) + } + + // check that only keys present in the archived set are in rdb + vals := r.client.Keys(context.Background(), base.TaskKeyPrefix(queue)+"*").Val() + if len(vals) != len(gotArchived) { + t.Errorf("len of keys = %v, want %v", len(vals), len(gotArchived)) + return + } + + for _, val := range vals { + found := false + for _, entry := range gotArchived { + if strings.Contains(val, entry.Message.ID) { + found = true + break + } + } + + if !found { + t.Errorf("key %v not found in archived set (it was orphaned by the archive trim)", val) + } + } + } } }