2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-07-06 13:23:39 +08:00

fixed trimmed archive tasks not being deleted.

This commit is contained in:
Harrison 2023-08-31 22:18:43 -05:00
parent 6b98c0bbae
commit a2296fd5cf
2 changed files with 102 additions and 2 deletions

View File

@ -829,6 +829,7 @@ const (
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed // KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed // KEYS[8] -> asynq:{<qname>}:failed
// KEYS[9] -> asynq:{<qname>}:t:
// ------- // -------
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value // ARGV[2] -> updated base.TaskMessage value
@ -845,8 +846,26 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) local old = redis.call("ZRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) if #old > 0 then
redis.log(redis.LOG_NOTICE, "archive: deleting old tasks", unpack(old))
for _, id in ipairs(old) do
redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id)
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(old))
end
local extra = redis.call("ZRANGE", KEYS[4], 0, -ARGV[5])
if #extra > 0 then
redis.log(redis.LOG_NOTICE, "archive: deleting extra tasks")
for _, id in ipairs(extra) do
redis.log(redis.LOG_NOTICE, "archive: deleting task", KEYS[9] .. id)
redis.call("DEL", KEYS[9] .. id)
end
redis.call("ZREM", KEYS[4], unpack(extra))
end
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived")
local n = redis.call("INCR", KEYS[5]) local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
@ -889,6 +908,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
base.FailedKey(msg.Queue, now), base.FailedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue), base.ProcessedTotalKey(msg.Queue),
base.FailedTotalKey(msg.Queue), base.FailedTotalKey(msg.Queue),
base.TaskKeyPrefix(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,

View File

@ -2171,6 +2171,86 @@ func TestArchive(t *testing.T) {
} }
} }
func TestArchiveTrim(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
errMsg := "SMTP server not responding"
// create 10k archived tasks
taskCount := maxArchiveSize - 1
archivedTasks := make([]base.Z, 0)
for i := 0; i < taskCount; i++ {
id := uuid.NewString()
task := base.TaskMessage{
ID: id,
Type: "send_email",
Payload: nil,
Queue: "default",
}
archivedTasks = append(archivedTasks, base.Z{
Message: h.TaskMessageWithError(task, errMsg, now),
Score: now.Add(-1 * time.Hour).Unix(),
})
}
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, map[string][]base.Z{
"default": archivedTasks,
})
archivedEntriesBefore := h.GetArchivedEntries(t, r.client, "default")
if len(archivedEntriesBefore) != taskCount {
t.Errorf("len of archived entries before = %v, want %v", len(archivedEntriesBefore), maxArchiveSize-1)
return
}
// set up task that will cause archive queue to be trimmed
id := uuid.NewString()
target := &base.TaskMessage{
ID: id,
Type: "send_email",
Payload: nil,
Queue: "default",
}
h.SeedAllActiveQueues(t, r.client, map[string][]*base.TaskMessage{
"default": {target},
})
h.SeedAllLease(t, r.client, map[string][]base.Z{
"default": {{Message: target, Score: now.Add(10 * time.Second).Unix()}},
})
err := r.Archive(context.Background(), target, errMsg)
if err != nil {
t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", target, errMsg, err)
return
}
archivedEntriesInSet := h.GetArchivedEntries(t, r.client, "default")
if len(archivedEntriesInSet) != taskCount {
t.Errorf("len of archived entries = %v, want %v", len(archivedEntriesInSet), taskCount)
return
}
// check that the target task is where we expect it
newestTask := archivedEntriesInSet[len(archivedEntriesInSet)-1].Message
if newestTask.ID != target.ID {
t.Errorf("newest task in archive set = %v, want %v", newestTask.ID, target.ID)
return
}
// now check if trim actually deleted the keys see if it's equal to taskCount
vals := r.client.Keys(context.Background(), base.TaskKeyPrefix("default")+"*").Val()
if len(vals) != taskCount {
t.Errorf("len of keys = %v, want %v", len(vals), taskCount)
return
}
}
func TestForwardIfReadyWithGroup(t *testing.T) { func TestForwardIfReadyWithGroup(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()