2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-20 15:50:20 +08:00

improved test case.

This commit is contained in:
Harrison 2023-09-19 14:56:41 -05:00
parent a2296fd5cf
commit 39b01d3563
2 changed files with 138 additions and 66 deletions

View File

@ -848,9 +848,7 @@ end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
local old = redis.call("ZRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) local old = redis.call("ZRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
if #old > 0 then if #old > 0 then
redis.log(redis.LOG_NOTICE, "archive: deleting old tasks", unpack(old))
for _, id in ipairs(old) do for _, id in ipairs(old) do
redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id)
redis.call("DEL", KEYS[9] .. id) redis.call("DEL", KEYS[9] .. id)
end end
redis.call("ZREM", KEYS[4], unpack(old)) redis.call("ZREM", KEYS[4], unpack(old))
@ -858,9 +856,7 @@ end
local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5]) local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5])
if #extra > 0 then if #extra > 0 then
redis.log(redis.LOG_NOTICE, "archive: deleting extra tasks")
for _, id in ipairs(extra) do for _, id in ipairs(extra) do
redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id)
redis.call("DEL", KEYS[9] .. id) redis.call("DEL", KEYS[9] .. id)
end end
redis.call("ZREM", KEYS[4], unpack(extra)) redis.call("ZREM", KEYS[4], unpack(extra))

View File

@ -2002,7 +2002,6 @@ func TestArchive(t *testing.T) {
} }
errMsg := "SMTP server not responding" errMsg := "SMTP server not responding"
// TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
active map[string][]*base.TaskMessage active map[string][]*base.TaskMessage
lease map[string][]base.Z lease map[string][]base.Z
@ -2177,77 +2176,154 @@ func TestArchiveTrim(t *testing.T) {
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now)) r.SetClock(timeutil.NewSimulatedClock(now))
errMsg := "SMTP server not responding" t1 := &base.TaskMessage{
ID: uuid.NewString(),
// create 10k archived tasks
taskCount := maxArchiveSize - 1
archivedTasks := make([]base.Z, 0)
for i := 0; i < taskCount; i++ {
id := uuid.NewString()
task := base.TaskMessage{
ID: id,
Type: "send_email",
Payload: nil,
Queue: "default",
}
archivedTasks = append(archivedTasks, base.Z{
Message: h.TaskMessageWithError(task, errMsg, now),
Score: now.Add(-1 * time.Hour).Unix(),
})
}
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{
"default": archivedTasks,
})
archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default")
if len(archivedEntriesBefore) != taskCount {
t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1)
return
}
// set up task that will cause archive queue to be trimmed
id := uuid.NewString()
target := &base.TaskMessage{
ID: id,
Type: "send_email", Type: "send_email",
Payload: nil, Payload: nil,
Queue: "default", Queue: "default",
Retry: 25,
Retried: 25,
Timeout: 1800,
}
t2 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "reindex",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 3000,
}
errMsg := "SMTP server not responding"
maxArchiveSet := make([]base.Z, 0)
for i := 0; i < maxArchiveSize-1; i++ {
maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{
ID: uuid.NewString(),
Type: "generate_csv",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 60,
}, Score: now.Add(-time.Hour + -time.Second*time.Duration(i)).Unix()})
} }
h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{ wantMaxArchiveSet := make([]base.Z, 0)
"default": {target}, // newly archived task should be at the front
}) wantMaxArchiveSet = append(wantMaxArchiveSet, base.Z{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()})
h.SeedAllLease(t, r.client, map[string][]base.Z{ // oldest task should be dropped from the set
"default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}}, wantMaxArchiveSet = append(wantMaxArchiveSet, maxArchiveSet[:len(maxArchiveSet)-1]...)
})
err := r.Archive(context.Background(), target, errMsg) tests := []struct {
if err != nil { toArchive map[string][]*base.TaskMessage
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err) lease map[string][]base.Z
return archived map[string][]base.Z
wantArchived map[string][]base.Z
}{
{ // simple, 1 to be archived, 1 already archived, both are in the archive set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
},
{ // 1 to be archived, 1 already archived but past expiry, only the newly archived task should be left
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
},
},
},
{ // 1 to be archived, maxArchiveSize in archive set, archive set should be trimmed back to maxArchiveSize and newly archived task should be in the set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": maxArchiveSet,
},
wantArchived: map[string][]base.Z{
"default": wantMaxArchiveSet,
},
},
} }
archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default") for _, tc := range tests {
if len(archivedEntriesInSet) != taskCount { h.FlushDB(t, r.client) // clean up db before each test case
t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount) h.SeedAllActiveQueues(t, r.client, tc.toArchive)
return h.SeedAllLease(t, r.client, tc.lease)
} h.SeedAllArchivedQueues(t, r.client, tc.archived)
// check that the target task is where we expect it for _, tasks := range tc.toArchive {
newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message for _, target := range tasks {
if newestTask.ID != target.ID { err := r.Archive(context.Background(), target, errMsg)
t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID) if err != nil {
return t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
} continue
}
}
}
// now check if trim actually deleted the keys see if it's equal to taskCount for queue, want := range tc.wantArchived {
vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val() gotArchived := h.GetArchivedEntries(t, r.client, queue)
if len(vals) != taskCount {
t.Errorf("len of keys = %v, want %v", len(vals), taskCount) if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" {
return t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
}
// check that only keys present in the archived set are in rdb
vals := r.client.Keys(context.Background(), base.TaskKeyPrefix(queue)+"*").Val()
if len(vals) != len(gotArchived) {
t.Errorf("len of keys = %v, want %v", len(vals), len(gotArchived))
return
}
for _, val := range vals {
found := false
for _, entry := range gotArchived {
if strings.Contains(val, entry.Message.ID) {
found = true
break
}
}
if !found {
t.Errorf("key %v not found in archived set (it was orphaned by the archive trim)", val)
}
}
}
} }
} }