diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index f18e800..2a2bd67 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -537,13 +537,21 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { // ArchiveAllRetryTasks archives all retry tasks from the given queue and // returns the number of tasks that were moved. func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) { - return r.removeAndArchiveAll(base.RetryKey(qname), base.ArchivedKey(qname)) + n, err := r.archiveAll(base.RetryKey(qname), base.ArchivedKey(qname), qname) + if err != nil { + return 0, errors.E(errors.Op("rdb.ArchiveAllRetryTasks"), errors.Internal, err) + } + return n, nil } // ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and // returns the number of tasks that were moved. func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { - return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) + n, err := r.archiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname), qname) + if err != nil { + return 0, errors.E(errors.Op("rdb.ArchiveAllScheduledTasks"), errors.Internal, err) + } + return n, nil } // archiveAllPendingCmd is a Lua script that moves all pending tasks from @@ -692,36 +700,45 @@ func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { } } +// archiveAllCmd is a Lua script that archives all tasks in either scheduled +// or retry state from the given queue. +// +// Input: // KEYS[1] -> ZSET to move task from (e.g., asynq:{}:retry) // KEYS[2] -> asynq:{}:archived // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) -var removeAndArchiveAllCmd = redis.NewScript(` +// ARGV[4] -> task key prefix (asynq:{}:t:) +// +// Output: +// integer: number of tasks archived +var archiveAllCmd = redis.NewScript(` local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) + redis.call("HSET", ARGV[4] .. id, "state", "archived") end +redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) +redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) redis.call("DEL", KEYS[1]) return table.getn(ids)`) -func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) { +func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { now := time.Now() argv := []interface{}{ now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, + base.TaskKeyPrefix(qname), } - res, err := removeAndArchiveAllCmd.Run(r.client, - []string{src, dst}, argv...).Result() + res, err := archiveAllCmd.Run(r.client, []string{src, dst}, argv...).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("command error: unexpected return value %v", res) + return 0, fmt.Errorf("unexpected return value from script: %v", res) } return n, nil }