2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ArchiveAllRetryTasks, RDB.ArchiveAllScheduledTasks

This commit is contained in:
Ken Hibino 2021-02-27 14:58:42 -08:00
parent 420bd2c748
commit e5ba008619

View File

@ -682,32 +682,36 @@ func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) {
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[1] -> ZSET to move task from (e.g., asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var removeAndArchiveAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
return table.getn(msgs)`)
redis.call("DEL", KEYS[1])
return table.getn(ids)`)
func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst},
now.Unix(), limit, maxArchiveSize).Result()
argv := []interface{}{
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := removeAndArchiveAllCmd.Run(r.client,
[]string{src, dst}, argv...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}