2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Update RDB.ArchiveRetryTask and RDB.ArchiveScheduledTask

This commit is contained in:
Ken Hibino 2021-02-26 21:15:26 -08:00
parent ef0c390642
commit aecdfeaeee
2 changed files with 53 additions and 64 deletions

View File

@ -316,13 +316,13 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
// ARGV[2] -> stop offset
// ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {}
for _, id in ipairs(ids) do
local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {}
for _, id in ipairs(ids) do
local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)
// listMessages returns a list of TaskMessage in Redis list with the given key.
@ -387,14 +387,14 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
// Returns an array populated with
// [msg1, score1, msg2, score2, ..., msgN, scoreN]
var listZSetEntriesCmd = redis.NewScript(`
local res = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
end
return res
local res = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
end
return res
`)
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
@ -492,12 +492,12 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
var removeAndRunCmd = redis.NewScript(`
local n = redis.call("ZREM", KEYS[1], ARGV[1])
if n == 0 then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
local n = redis.call("ZREM", KEYS[1], ARGV[1])
if n == 0 then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
@ -532,10 +532,11 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
return n, nil
}
// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveRetryTask finds a retry task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@ -545,10 +546,11 @@ func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
return nil
}
// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveScheduledTask finds a scheduled task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@ -665,31 +667,26 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> score of the task to archive
// ARGV[2] -> id of the task to archive
// ARGV[3] -> current timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archived state (e.g., 100)
// ARGV[1] -> id of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
var removeAndArchiveCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("ZADD", KEYS[2], ARGV[3], msg)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
return 1
end
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
return 0
end
return 0`)
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) {
func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveCmd.Run(r.client,
[]string{src, dst},
score, id, now.Unix(), limit, maxArchiveSize).Result()
id, now.Unix(), limit, maxArchiveSize).Result()
if err != nil {
return 0, err
}

View File

@ -1617,7 +1617,7 @@ func TestRunAllArchivedTasks(t *testing.T) {
}
}
func TestKillRetryTask(t *testing.T) {
func TestArchiveRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -1634,7 +1634,6 @@ func TestKillRetryTask(t *testing.T) {
archived map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantRetry map[string][]base.Z
wantArchived map[string][]base.Z
@ -1651,7 +1650,6 @@ func TestKillRetryTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
@ -1668,8 +1666,7 @@ func TestKillRetryTask(t *testing.T) {
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
id: uuid.New(),
want: ErrTaskNotFound,
wantRetry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
@ -1695,7 +1692,6 @@ func TestKillRetryTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {
@ -1718,10 +1714,10 @@ func TestKillRetryTask(t *testing.T) {
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchiveRetryTask(tc.qname, tc.id, tc.score)
got := r.ArchiveRetryTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("(*RDB).KillRetryTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}
@ -1743,7 +1739,7 @@ func TestKillRetryTask(t *testing.T) {
}
}
func TestKillScheduledTask(t *testing.T) {
func TestArchiveScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -1760,7 +1756,6 @@ func TestKillScheduledTask(t *testing.T) {
archived map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantScheduled map[string][]base.Z
wantArchived map[string][]base.Z
@ -1777,7 +1772,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
@ -1795,7 +1789,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantScheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
@ -1821,7 +1814,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {
@ -1844,10 +1836,10 @@ func TestKillScheduledTask(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchiveScheduledTask(tc.qname, tc.id, tc.score)
got := r.ArchiveScheduledTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("(*RDB).KillScheduledTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}