2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Update RDB.Requeue

This commit is contained in:
Ken Hibino 2021-02-23 06:45:06 -08:00
parent 740cb9bad0
commit 5105f35697
2 changed files with 8 additions and 12 deletions

View File

@ -284,8 +284,8 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}
// ARGV[1] -> base.TaskMessage value
// KEYS[3] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
// Note: Use RPUSH to push to the head of the queue.
var requeueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
@ -299,13 +299,9 @@ return redis.status_reply("OK")`)
// Requeue moves the task from active queue to the specified queue.
func (r *RDB) Requeue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
return requeueCmd.Run(r.client,
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)},
encoded).Err()
msg.ID.String()).Err()
}
// KEYS[1] -> asynq:{<qname>}:t:<task_id>

View File

@ -644,7 +644,7 @@ func TestRequeue(t *testing.T) {
tests := []struct {
pending map[string][]*base.TaskMessage // initial state of queues
inProgress map[string][]*base.TaskMessage // initial state of the active list
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
target *base.TaskMessage // task to requeue
wantPending map[string][]*base.TaskMessage // final state of queues
@ -655,7 +655,7 @@ func TestRequeue(t *testing.T) {
pending: map[string][]*base.TaskMessage{
"default": {},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
@ -681,7 +681,7 @@ func TestRequeue(t *testing.T) {
pending: map[string][]*base.TaskMessage{
"default": {t1},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t2},
},
deadlines: map[string][]base.Z{
@ -705,7 +705,7 @@ func TestRequeue(t *testing.T) {
"default": {t1},
"critical": {},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t2},
"critical": {t3},
},
@ -732,7 +732,7 @@ func TestRequeue(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
err := r.Requeue(tc.target)