mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-25 23:06:12 +08:00
Update Retry method in RDB
This commit is contained in:
@@ -309,11 +309,11 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
|
||||
return nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:deadlines
|
||||
// KEYS[3] -> asynq:retry
|
||||
// KEYS[4] -> asynq:processed:<yyyy-mm-dd>
|
||||
// KEYS[5] -> asynq:failure:<yyyy-mm-dd>
|
||||
// KEYS[1] -> asynq:{<qname>}:in_progress
|
||||
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||
// KEYS[3] -> asynq:{<qname>}:retry
|
||||
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||
// KEYS[5] -> asynq:{<qname>}:failure:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
||||
// ARGV[3] -> retry_at UNIX timestamp
|
||||
@@ -351,11 +351,11 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
||||
return err
|
||||
}
|
||||
now := time.Now()
|
||||
processedKey := base.ProcessedKey(now)
|
||||
failureKey := base.FailureKey(now)
|
||||
processedKey := base.ProcessedKey(msg.Queue, now)
|
||||
failedKey := base.FailedKey(msg.Queue, now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
return retryCmd.Run(r.client,
|
||||
[]string{base.InProgressQueue, base.KeyDeadlines, base.RetryQueue, processedKey, failureKey},
|
||||
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
|
||||
msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user