mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Update RDB.Retry
This commit is contained in:
parent
a745b2378a
commit
aa40a21654
@ -392,44 +392,42 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:active
|
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||||
// KEYS[2] -> asynq:{<qname>}:deadlines
|
// KEYS[2] -> asynq:{<qname>}:active
|
||||||
// KEYS[3] -> asynq:{<qname>}:retry
|
// KEYS[3] -> asynq:{<qname>}:deadlines
|
||||||
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
// KEYS[4] -> asynq:{<qname>}:retry
|
||||||
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||||
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
|
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
||||||
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
// ARGV[1] -> task ID
|
||||||
|
// ARGV[2] -> updated base.TaskMessage value
|
||||||
// ARGV[3] -> retry_at UNIX timestamp
|
// ARGV[3] -> retry_at UNIX timestamp
|
||||||
// ARGV[4] -> stats expiration timestamp
|
// ARGV[4] -> stats expiration timestamp
|
||||||
var retryCmd = redis.NewScript(`
|
var retryCmd = redis.NewScript(`
|
||||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
|
||||||
return redis.error_reply("NOT FOUND")
|
return redis.error_reply("NOT FOUND")
|
||||||
end
|
end
|
||||||
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
|
||||||
return redis.error_reply("NOT FOUND")
|
return redis.error_reply("NOT FOUND")
|
||||||
end
|
end
|
||||||
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
|
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
|
||||||
local n = redis.call("INCR", KEYS[4])
|
redis.call("SET", KEYS[1], ARGV[2])
|
||||||
|
local n = redis.call("INCR", KEYS[5])
|
||||||
if tonumber(n) == 1 then
|
if tonumber(n) == 1 then
|
||||||
redis.call("EXPIREAT", KEYS[4], ARGV[4])
|
|
||||||
end
|
|
||||||
local m = redis.call("INCR", KEYS[5])
|
|
||||||
if tonumber(m) == 1 then
|
|
||||||
redis.call("EXPIREAT", KEYS[5], ARGV[4])
|
redis.call("EXPIREAT", KEYS[5], ARGV[4])
|
||||||
end
|
end
|
||||||
|
local m = redis.call("INCR", KEYS[6])
|
||||||
|
if tonumber(m) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[6], ARGV[4])
|
||||||
|
end
|
||||||
return redis.status_reply("OK")`)
|
return redis.status_reply("OK")`)
|
||||||
|
|
||||||
// Retry moves the task from active to retry queue, incrementing retry count
|
// Retry moves the task from active to retry queue, incrementing retry count
|
||||||
// and assigning error message to the task message.
|
// and assigning error message to the task message.
|
||||||
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
||||||
msgToRemove, err := base.EncodeMessage(msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.Retried++
|
modified.Retried++
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
msgToAdd, err := base.EncodeMessage(&modified)
|
encoded, err := base.EncodeMessage(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -437,9 +435,21 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
processedKey := base.ProcessedKey(msg.Queue, now)
|
processedKey := base.ProcessedKey(msg.Queue, now)
|
||||||
failedKey := base.FailedKey(msg.Queue, now)
|
failedKey := base.FailedKey(msg.Queue, now)
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
return retryCmd.Run(r.client,
|
keys := []string{
|
||||||
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
|
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||||
msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err()
|
base.ActiveKey(msg.Queue),
|
||||||
|
base.DeadlinesKey(msg.Queue),
|
||||||
|
base.RetryKey(msg.Queue),
|
||||||
|
processedKey,
|
||||||
|
failedKey,
|
||||||
|
}
|
||||||
|
argv := []interface{}{
|
||||||
|
msg.ID.String(),
|
||||||
|
encoded,
|
||||||
|
processAt.Unix(),
|
||||||
|
expireAt.Unix(),
|
||||||
|
}
|
||||||
|
return retryCmd.Run(r.client, keys, argv...).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -896,7 +896,7 @@ func TestRetry(t *testing.T) {
|
|||||||
errMsg := "SMTP server is not responding"
|
errMsg := "SMTP server is not responding"
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
inProgress map[string][]*base.TaskMessage
|
active map[string][]*base.TaskMessage
|
||||||
deadlines map[string][]base.Z
|
deadlines map[string][]base.Z
|
||||||
retry map[string][]base.Z
|
retry map[string][]base.Z
|
||||||
msg *base.TaskMessage
|
msg *base.TaskMessage
|
||||||
@ -907,7 +907,7 @@ func TestRetry(t *testing.T) {
|
|||||||
wantRetry map[string][]base.Z
|
wantRetry map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
inProgress: map[string][]*base.TaskMessage{
|
active: map[string][]*base.TaskMessage{
|
||||||
"default": {t1, t2},
|
"default": {t1, t2},
|
||||||
},
|
},
|
||||||
deadlines: map[string][]base.Z{
|
deadlines: map[string][]base.Z{
|
||||||
@ -933,7 +933,7 @@ func TestRetry(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
inProgress: map[string][]*base.TaskMessage{
|
active: map[string][]*base.TaskMessage{
|
||||||
"default": {t1, t2},
|
"default": {t1, t2},
|
||||||
"custom": {t4},
|
"custom": {t4},
|
||||||
},
|
},
|
||||||
@ -967,7 +967,7 @@ func TestRetry(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
|
h.SeedAllActiveQueues(t, r.client, tc.active)
|
||||||
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
||||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user