2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Archived tasks that are trimmed from the set are deleted (#743)

* fixed trimmed archive tasks not being deleted.

* improved test case.

* changed ZRANGEBYSCORE to ZRANGE with BYSCORE option.

---------

Co-authored-by: Harrison <harrison@Harrisons-MacBook-Pro.local>
Co-authored-by: Harrison Miller <harrison.miller@MBP-Harrison-Miller-M2.local>
This commit is contained in:
Harrison Miller 2024-10-19 01:18:09 -05:00 committed by GitHub
parent 461d922616
commit 0dc670d7d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 175 additions and 3 deletions

View File

@ -829,6 +829,7 @@ const (
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed // KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed // KEYS[8] -> asynq:{<qname>}:failed
// KEYS[9] -> asynq:{<qname>}:t:
// ------- // -------
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value // ARGV[2] -> updated base.TaskMessage value
@ -845,8 +846,22 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) local old = redis.call("ZRANGE", KEYS[4], "-inf", ARGV[4], "BYSCORE")
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) if #old > 0 then
for _, id in ipairs(old) do
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(old))
end
local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5])
if #extra > 0 then
for _, id in ipairs(extra) do
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(extra))
end
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived")
local n = redis.call("INCR", KEYS[5]) local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
@ -889,6 +904,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
base.FailedKey(msg.Queue, now), base.FailedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue), base.ProcessedTotalKey(msg.Queue),
base.FailedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue),
base.TaskKeyPrefix(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,

View File

@ -2002,7 +2002,6 @@ func TestArchive(t *testing.T) {
} }
errMsg := "SMTP server not responding" errMsg := "SMTP server not responding"
// TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
active map[string][]*base.TaskMessage active map[string][]*base.TaskMessage
lease map[string][]base.Z lease map[string][]base.Z
@ -2171,6 +2170,163 @@ func TestArchive(t *testing.T) {
} }
} }
func TestArchiveTrim(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
t1 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "send_email",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 25,
Timeout: 1800,
}
t2 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "reindex",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 3000,
}
errMsg := "SMTP server not responding"
maxArchiveSet := make([]base.Z, 0)
for i := 0; i < maxArchiveSize-1; i++ {
maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{
ID: uuid.NewString(),
Type: "generate_csv",
Payload: nil,
Queue: "default",
Retry: 25,
Retried: 0,
Timeout: 60,
}, Score: now.Add(-time.Hour + -time.Second*time.Duration(i)).Unix()})
}
wantMaxArchiveSet := make([]base.Z, 0)
// newly archived task should be at the front
wantMaxArchiveSet = append(wantMaxArchiveSet, base.Z{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()})
// oldest task should be dropped from the set
wantMaxArchiveSet = append(wantMaxArchiveSet, maxArchiveSet[:len(maxArchiveSet)-1]...)
tests := []struct {
toArchive map[string][]*base.TaskMessage
lease map[string][]base.Z
archived map[string][]base.Z
wantArchived map[string][]base.Z
}{
{ // simple, 1 to be archived, 1 already archived, both are in the archive set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
{Message: t2, Score: now.Add(-time.Hour).Unix()},
},
},
},
{ // 1 to be archived, 1 already archived but past expiry, only the newly archived task should be left
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": {
{Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()},
},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: h.TaskMessageWithError(*t1, errMsg, now), Score: now.Unix()},
},
},
},
{ // 1 to be archived, maxArchiveSize in archive set, archive set should be trimmed back to maxArchiveSize and newly archived task should be in the set
toArchive: map[string][]*base.TaskMessage{
"default": {t1},
},
lease: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
},
},
archived: map[string][]base.Z{
"default": maxArchiveSet,
},
wantArchived: map[string][]base.Z{
"default": wantMaxArchiveSet,
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.toArchive)
h.SeedAllLease(t, r.client, tc.lease)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
for _, tasks := range tc.toArchive {
for _, target := range tasks {
err := r.Archive(context.Background(), target, errMsg)
if err != nil {
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
continue
}
}
}
for queue, want := range tc.wantArchived {
gotArchived := h.GetArchivedEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
}
// check that only keys present in the archived set are in rdb
vals := r.client.Keys(context.Background(), base.TaskKeyPrefix(queue)+"*").Val()
if len(vals) != len(gotArchived) {
t.Errorf("len of keys = %v, want %v", len(vals), len(gotArchived))
return
}
for _, val := range vals {
found := false
for _, entry := range gotArchived {
if strings.Contains(val, entry.Message.ID) {
found = true
break
}
}
if !found {
t.Errorf("key %v not found in archived set (it was orphaned by the archive trim)", val)
}
}
}
}
}
func TestForwardIfReadyWithGroup(t *testing.T) { func TestForwardIfReadyWithGroup(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()