From 62f4e46b730395e671d1031ca72ab7fbc28e0a58 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 3 May 2021 16:38:13 -0700 Subject: [PATCH] Update RDB.ArchiveAllPendingTasks with task state --- internal/rdb/inspect.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c213ab2..f18e800 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -546,38 +546,49 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) } +// archiveAllPendingCmd is a Lua script that moves all pending tasks from +// the given queue to archived state. +// +// Input: // KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:archived // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in archive (e.g., 100) +// ARGV[4] -> task key prefix (asynq:{}:t:) +// +// Output: +// integer: Number of tasks archiveda var archiveAllPendingCmd = redis.NewScript(` local ids = redis.call("LRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do redis.call("ZADD", KEYS[2], ARGV[1], id) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) + redis.call("HSET", ARGV[4] .. id, "state", "archived") end +redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) +redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) redis.call("DEL", KEYS[1]) return table.getn(ids)`) // ArchiveAllPendingTasks archives all pending tasks from the given queue and // returns the number of tasks moved. func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { + var op errors.Op = "rdb.ArchiveAllPendingTasks" keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} now := time.Now() argv := []interface{}{ now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, + base.TaskKeyPrefix(qname), } res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result() if err != nil { - return 0, err + return 0, errors.E(op, errors.Internal, err) } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("command error: unexpected return value %v", res) + return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from script %v", res)) } return n, nil }