diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 4a96327..0abc2fb 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -682,32 +682,36 @@ func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) { return n, nil } -// KEYS[1] -> ZSET to move task from (e.g., retry queue) +// KEYS[1] -> ZSET to move task from (e.g., asynq:{}:retry) // KEYS[2] -> asynq:{}:archived // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) var removeAndArchiveAllCmd = redis.NewScript(` -local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) -for _, msg in ipairs(msgs) do - redis.call("ZADD", KEYS[2], ARGV[1], msg) - redis.call("ZREM", KEYS[1], msg) +local ids = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, id in ipairs(ids) do + redis.call("ZADD", KEYS[2], ARGV[1], id) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) end -return table.getn(msgs)`) +redis.call("DEL", KEYS[1]) +return table.getn(ids)`) func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) { now := time.Now() - limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago - res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst}, - now.Unix(), limit, maxArchiveSize).Result() + argv := []interface{}{ + now.Unix(), + now.AddDate(0, 0, -archivedExpirationInDays).Unix(), + maxArchiveSize, + } + res, err := removeAndArchiveAllCmd.Run(r.client, + []string{src, dst}, argv...).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return 0, fmt.Errorf("command error: unexpected return value %v", res) } return n, nil }