2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.Archive

This commit is contained in:
Ken Hibino 2021-02-23 06:35:59 -08:00
parent aa40a21654
commit 740cb9bad0
2 changed files with 45 additions and 37 deletions

View File

@ -432,16 +432,14 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
return err return err
} }
now := time.Now() now := time.Now()
processedKey := base.ProcessedKey(msg.Queue, now)
failedKey := base.FailedKey(msg.Queue, now)
expireAt := now.Add(statsTTL) expireAt := now.Add(statsTTL)
keys := []string{ keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()), base.TaskKey(msg.Queue, msg.ID.String()),
base.ActiveKey(msg.Queue), base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue), base.DeadlinesKey(msg.Queue),
base.RetryKey(msg.Queue), base.RetryKey(msg.Queue),
processedKey, base.ProcessedKey(msg.Queue, now),
failedKey, base.FailedKey(msg.Queue, now),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID.String(), msg.ID.String(),
@ -457,58 +455,68 @@ const (
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
) )
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:archived // KEYS[3] -> asynq:{<qname>}:deadlines
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[4] -> asynq:{<qname>}:archived
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value to remove // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[2] -> base.TaskMessage value to add // ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> died_at UNIX timestamp // ARGV[3] -> died_at UNIX timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[5] -> max number of tasks in archive (e.g., 100)
// ARGV[6] -> stats expiration timestamp // ARGV[6] -> stats expiration timestamp
var archiveCmd = redis.NewScript(` var archiveCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[3], "-inf", ARGV[4]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[3], 0, -ARGV[5]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
local n = redis.call("INCR", KEYS[4]) redis.call("SET", KEYS[1], ARGV[2])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[6])
end
local m = redis.call("INCR", KEYS[5])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[6]) redis.call("EXPIREAT", KEYS[5], ARGV[6])
end end
local m = redis.call("INCR", KEYS[6])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[6], ARGV[6])
end
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Archive sends the given task to archive, attaching the error message to the task. // Archive sends the given task to archive, attaching the error message to the task.
// It also trims the archive by timestamp and set size. // It also trims the archive by timestamp and set size.
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
msgToRemove, err := base.EncodeMessage(msg)
if err != nil {
return err
}
modified := *msg modified := *msg
modified.ErrorMsg = errMsg modified.ErrorMsg = errMsg
msgToAdd, err := base.EncodeMessage(&modified) encoded, err := base.EncodeMessage(&modified)
if err != nil { if err != nil {
return err return err
} }
now := time.Now() now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
processedKey := base.ProcessedKey(msg.Queue, now)
failedKey := base.FailedKey(msg.Queue, now)
expireAt := now.Add(statsTTL) expireAt := now.Add(statsTTL)
return archiveCmd.Run(r.client, keys := []string{
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey}, base.TaskKey(msg.Queue, msg.ID.String()),
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err() base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.ArchivedKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now),
}
argv := []interface{}{
msg.ID.String(),
encoded,
now.Unix(),
cutoff.Unix(),
maxArchiveSize,
expireAt.Unix(),
}
return archiveCmd.Run(r.client, keys, argv...).Err()
} }
// ForwardIfReady checks scheduled and retry sets of the given queues // ForwardIfReady checks scheduled and retry sets of the given queues

View File

@ -1066,7 +1066,7 @@ func TestArchive(t *testing.T) {
// TODO(hibiken): add test cases for trimming // TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
inProgress map[string][]*base.TaskMessage active map[string][]*base.TaskMessage
deadlines map[string][]base.Z deadlines map[string][]base.Z
archived map[string][]base.Z archived map[string][]base.Z
target *base.TaskMessage // task to archive target *base.TaskMessage // task to archive
@ -1075,7 +1075,7 @@ func TestArchive(t *testing.T) {
wantArchived map[string][]base.Z wantArchived map[string][]base.Z
}{ }{
{ {
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1, t2}, "default": {t1, t2},
}, },
deadlines: map[string][]base.Z{ deadlines: map[string][]base.Z{
@ -1104,7 +1104,7 @@ func TestArchive(t *testing.T) {
}, },
}, },
{ {
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
deadlines: map[string][]base.Z{ deadlines: map[string][]base.Z{
@ -1134,7 +1134,7 @@ func TestArchive(t *testing.T) {
}, },
}, },
{ {
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"custom": {t4}, "custom": {t4},
}, },
@ -1170,7 +1170,7 @@ func TestArchive(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllArchivedQueues(t, r.client, tc.archived) h.SeedAllArchivedQueues(t, r.client, tc.archived)