diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 691c038..8d53ffd 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -316,13 +316,13 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err // ARGV[2] -> stop offset // ARGV[3] -> task key prefix var listMessagesCmd = redis.NewScript(` - local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) - local res = {} - for _, id in ipairs(ids) do - local key = ARGV[3] .. id - table.insert(res, redis.call("HGET", key, "msg")) - end - return res +local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) +local res = {} +for _, id in ipairs(ids) do + local key = ARGV[3] .. id + table.insert(res, redis.call("HGET", key, "msg")) +end +return res `) // listMessages returns a list of TaskMessage in Redis list with the given key. @@ -387,14 +387,14 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { // Returns an array populated with // [msg1, score1, msg2, score2, ..., msgN, scoreN] var listZSetEntriesCmd = redis.NewScript(` - local res = {} - local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES") - for i = 1, table.getn(id_score_pairs), 2 do - local key = ARGV[3] .. id_score_pairs[i] - table.insert(res, redis.call("HGET", key, "msg")) - table.insert(res, id_score_pairs[i+1]) - end - return res +local res = {} +local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES") +for i = 1, table.getn(id_score_pairs), 2 do + local key = ARGV[3] .. id_score_pairs[i] + table.insert(res, redis.call("HGET", key, "msg")) + table.insert(res, id_score_pairs[i+1]) +end +return res `) // listZSetEntries returns a list of message and score pairs in Redis sorted-set @@ -492,12 +492,12 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { // KEYS[2] -> asynq:{}:pending // ARGV[1] -> task ID var removeAndRunCmd = redis.NewScript(` - local n = redis.call("ZREM", KEYS[1], ARGV[1]) - if n == 0 then - return 0 - end - redis.call("LPUSH", KEYS[2], ARGV[1]) - return 1 +local n = redis.call("ZREM", KEYS[1], ARGV[1]) +if n == 0 then + return 0 +end +redis.call("LPUSH", KEYS[2], ARGV[1]) +return 1 `) func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) { @@ -532,10 +532,11 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { return n, nil } -// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue -// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score)) +// ArchiveRetryTask finds a retry task that matches the given id +// from the given queue and archives it. +// If there's no match, it returns ErrTaskNotFound. +func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID) error { + n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String()) if err != nil { return err } @@ -545,10 +546,11 @@ func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error { return nil } -// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue -// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score)) +// ArchiveScheduledTask finds a scheduled task that matches the given id +// from the given queue and archives it. +// If there's no match, it returns ErrTaskNotFound. +func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error { + n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String()) if err != nil { return err } @@ -665,31 +667,26 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { // KEYS[1] -> ZSET to move task from (e.g., retry queue) // KEYS[2] -> asynq:{}:archived -// ARGV[1] -> score of the task to archive -// ARGV[2] -> id of the task to archive -// ARGV[3] -> current timestamp -// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) -// ARGV[5] -> max number of tasks in archived state (e.g., 100) +// ARGV[1] -> id of the task to archive +// ARGV[2] -> current timestamp +// ARGV[3] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[4] -> max number of tasks in archived state (e.g., 100) var removeAndArchiveCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) -for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("ZREM", KEYS[1], msg) - redis.call("ZADD", KEYS[2], ARGV[3], msg) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) - return 1 - end +if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then + return 0 end -return 0`) +redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) +redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3]) +redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4]) +return 1 +`) -func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) { +func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) { now := time.Now() limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago res, err := removeAndArchiveCmd.Run(r.client, []string{src, dst}, - score, id, now.Unix(), limit, maxArchiveSize).Result() + id, now.Unix(), limit, maxArchiveSize).Result() if err != nil { return 0, err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index c55e4a3..7b7160d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1617,7 +1617,7 @@ func TestRunAllArchivedTasks(t *testing.T) { } } -func TestKillRetryTask(t *testing.T) { +func TestArchiveRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1634,7 +1634,6 @@ func TestKillRetryTask(t *testing.T) { archived map[string][]base.Z qname string id uuid.UUID - score int64 want error wantRetry map[string][]base.Z wantArchived map[string][]base.Z @@ -1651,7 +1650,6 @@ func TestKillRetryTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: nil, wantRetry: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, @@ -1668,8 +1666,7 @@ func TestKillRetryTask(t *testing.T) { "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", - id: m2.ID, - score: t2.Unix(), + id: uuid.New(), want: ErrTaskNotFound, wantRetry: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, @@ -1695,7 +1692,6 @@ func TestKillRetryTask(t *testing.T) { }, qname: "custom", id: m3.ID, - score: t3.Unix(), want: nil, wantRetry: map[string][]base.Z{ "default": { @@ -1718,10 +1714,10 @@ func TestKillRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.ArchiveRetryTask(tc.qname, tc.id, tc.score) + got := r.ArchiveRetryTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v", - tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("(*RDB).KillRetryTask(%q, %v) = %v, want %v", + tc.qname, tc.id, got, tc.want) continue } @@ -1743,7 +1739,7 @@ func TestKillRetryTask(t *testing.T) { } } -func TestKillScheduledTask(t *testing.T) { +func TestArchiveScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1760,7 +1756,6 @@ func TestKillScheduledTask(t *testing.T) { archived map[string][]base.Z qname string id uuid.UUID - score int64 want error wantScheduled map[string][]base.Z wantArchived map[string][]base.Z @@ -1777,7 +1772,6 @@ func TestKillScheduledTask(t *testing.T) { }, qname: "default", id: m1.ID, - score: t1.Unix(), want: nil, wantScheduled: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, @@ -1795,7 +1789,6 @@ func TestKillScheduledTask(t *testing.T) { }, qname: "default", id: m2.ID, - score: t2.Unix(), want: ErrTaskNotFound, wantScheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, @@ -1821,7 +1814,6 @@ func TestKillScheduledTask(t *testing.T) { }, qname: "custom", id: m3.ID, - score: t3.Unix(), want: nil, wantScheduled: map[string][]base.Z{ "default": { @@ -1844,10 +1836,10 @@ func TestKillScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.ArchiveScheduledTask(tc.qname, tc.id, tc.score) + got := r.ArchiveScheduledTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v", - tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("(*RDB).KillScheduledTask(%q, %v) = %v, want %v", + tc.qname, tc.id, got, tc.want) continue }