diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f3b900e..7427e58 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -432,16 +432,14 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e return err } now := time.Now() - processedKey := base.ProcessedKey(msg.Queue, now) - failedKey := base.FailedKey(msg.Queue, now) expireAt := now.Add(statsTTL) keys := []string{ base.TaskKey(msg.Queue, msg.ID.String()), base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), - processedKey, - failedKey, + base.ProcessedKey(msg.Queue, now), + base.FailedKey(msg.Queue, now), } argv := []interface{}{ msg.ID.String(), @@ -457,58 +455,68 @@ const ( archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently ) -// KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{}:archived -// KEYS[4] -> asynq:{}:processed: -// KEYS[5] -> asynq:{}:failed: -// ARGV[1] -> base.TaskMessage value to remove -// ARGV[2] -> base.TaskMessage value to add +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:active +// KEYS[3] -> asynq:{}:deadlines +// KEYS[4] -> asynq:{}:archived +// KEYS[5] -> asynq:{}:processed: +// KEYS[6] -> asynq:{}:failed: +// ARGV[1] -> task ID +// ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[6] -> stats expiration timestamp var archiveCmd = redis.NewScript(` -if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then +if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then +if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2]) -redis.call("ZREMRANGEBYSCORE", KEYS[3], "-inf", ARGV[4]) -redis.call("ZREMRANGEBYRANK", KEYS[3], 0, -ARGV[5]) -local n = redis.call("INCR", KEYS[4]) +redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) +redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) +redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) +redis.call("SET", KEYS[1], ARGV[2]) +local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[4], ARGV[6]) -end -local m = redis.call("INCR", KEYS[5]) -if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6]) end +local m = redis.call("INCR", KEYS[6]) +if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[6], ARGV[6]) +end return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { - msgToRemove, err := base.EncodeMessage(msg) - if err != nil { - return err - } modified := *msg modified.ErrorMsg = errMsg - msgToAdd, err := base.EncodeMessage(&modified) + encoded, err := base.EncodeMessage(&modified) if err != nil { return err } now := time.Now() - limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago - processedKey := base.ProcessedKey(msg.Queue, now) - failedKey := base.FailedKey(msg.Queue, now) + cutoff := now.AddDate(0, 0, -archivedExpirationInDays) expireAt := now.Add(statsTTL) - return archiveCmd.Run(r.client, - []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey}, - msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err() + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + base.ActiveKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.ArchivedKey(msg.Queue), + base.ProcessedKey(msg.Queue, now), + base.FailedKey(msg.Queue, now), + } + argv := []interface{}{ + msg.ID.String(), + encoded, + now.Unix(), + cutoff.Unix(), + maxArchiveSize, + expireAt.Unix(), + } + return archiveCmd.Run(r.client, keys, argv...).Err() } // ForwardIfReady checks scheduled and retry sets of the given queues diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 58410ac..a5e0e8f 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1066,7 +1066,7 @@ func TestArchive(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - inProgress map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage deadlines map[string][]base.Z archived map[string][]base.Z target *base.TaskMessage // task to archive @@ -1075,7 +1075,7 @@ func TestArchive(t *testing.T) { wantArchived map[string][]base.Z }{ { - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, deadlines: map[string][]base.Z{ @@ -1104,7 +1104,7 @@ func TestArchive(t *testing.T) { }, }, { - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, deadlines: map[string][]base.Z{ @@ -1134,7 +1134,7 @@ func TestArchive(t *testing.T) { }, }, { - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1}, "custom": {t4}, }, @@ -1170,7 +1170,7 @@ func TestArchive(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllActiveQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.active) h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllArchivedQueues(t, r.client, tc.archived)