From aa40a2165419ecd84cd99cfb7c0fd244f22b7eef Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 23 Feb 2021 06:19:54 -0800 Subject: [PATCH] Update RDB.Retry --- internal/rdb/rdb.go | 56 +++++++++++++++++++++++----------------- internal/rdb/rdb_test.go | 8 +++--- 2 files changed, 37 insertions(+), 27 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fc16681..f3b900e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -392,44 +392,42 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim return nil } -// KEYS[1] -> asynq:{}:active -// KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{}:retry -// KEYS[4] -> asynq:{}:processed: -// KEYS[5] -> asynq:{}:failed: -// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue -// ARGV[2] -> base.TaskMessage value to add to Retry queue +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:active +// KEYS[3] -> asynq:{}:deadlines +// KEYS[4] -> asynq:{}:retry +// KEYS[5] -> asynq:{}:processed: +// KEYS[6] -> asynq:{}:failed: +// ARGV[1] -> task ID +// ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp var retryCmd = redis.NewScript(` -if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then +if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then +if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2]) -local n = redis.call("INCR", KEYS[4]) +redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) +redis.call("SET", KEYS[1], ARGV[2]) +local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[4], ARGV[4]) -end -local m = redis.call("INCR", KEYS[5]) -if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) end +local m = redis.call("INCR", KEYS[6]) +if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[6], ARGV[4]) +end return redis.status_reply("OK")`) // Retry moves the task from active to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { - msgToRemove, err := base.EncodeMessage(msg) - if err != nil { - return err - } modified := *msg modified.Retried++ modified.ErrorMsg = errMsg - msgToAdd, err := base.EncodeMessage(&modified) + encoded, err := base.EncodeMessage(&modified) if err != nil { return err } @@ -437,9 +435,21 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e processedKey := base.ProcessedKey(msg.Queue, now) failedKey := base.FailedKey(msg.Queue, now) expireAt := now.Add(statsTTL) - return retryCmd.Run(r.client, - []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey}, - msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err() + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + base.ActiveKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.RetryKey(msg.Queue), + processedKey, + failedKey, + } + argv := []interface{}{ + msg.ID.String(), + encoded, + processAt.Unix(), + expireAt.Unix(), + } + return retryCmd.Run(r.client, keys, argv...).Err() } const ( diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 8ff0db1..58410ac 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -896,7 +896,7 @@ func TestRetry(t *testing.T) { errMsg := "SMTP server is not responding" tests := []struct { - inProgress map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage deadlines map[string][]base.Z retry map[string][]base.Z msg *base.TaskMessage @@ -907,7 +907,7 @@ func TestRetry(t *testing.T) { wantRetry map[string][]base.Z }{ { - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, deadlines: map[string][]base.Z{ @@ -933,7 +933,7 @@ func TestRetry(t *testing.T) { }, }, { - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, "custom": {t4}, }, @@ -967,7 +967,7 @@ func TestRetry(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllActiveQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.active) h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllRetryQueues(t, r.client, tc.retry)