diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 7427e58..f8caefe 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -284,8 +284,8 @@ func (r *RDB) Done(msg *base.TaskMessage) error { // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{} -// ARGV[1] -> base.TaskMessage value +// KEYS[3] -> asynq:{}:pending +// ARGV[1] -> task ID // Note: Use RPUSH to push to the head of the queue. var requeueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then @@ -299,13 +299,9 @@ return redis.status_reply("OK")`) // Requeue moves the task from active queue to the specified queue. func (r *RDB) Requeue(msg *base.TaskMessage) error { - encoded, err := base.EncodeMessage(msg) - if err != nil { - return err - } return requeueCmd.Run(r.client, []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)}, - encoded).Err() + msg.ID.String()).Err() } // KEYS[1] -> asynq:{}:t: diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a5e0e8f..5f7e682 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -644,7 +644,7 @@ func TestRequeue(t *testing.T) { tests := []struct { pending map[string][]*base.TaskMessage // initial state of queues - inProgress map[string][]*base.TaskMessage // initial state of the active list + active map[string][]*base.TaskMessage // initial state of the active list deadlines map[string][]base.Z // initial state of the deadlines set target *base.TaskMessage // task to requeue wantPending map[string][]*base.TaskMessage // final state of queues @@ -655,7 +655,7 @@ func TestRequeue(t *testing.T) { pending: map[string][]*base.TaskMessage{ "default": {}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, deadlines: map[string][]base.Z{ @@ -681,7 +681,7 @@ func TestRequeue(t *testing.T) { pending: map[string][]*base.TaskMessage{ "default": {t1}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t2}, }, deadlines: map[string][]base.Z{ @@ -705,7 +705,7 @@ func TestRequeue(t *testing.T) { "default": {t1}, "critical": {}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t2}, "critical": {t3}, }, @@ -732,7 +732,7 @@ func TestRequeue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllActiveQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.active) h.SeedAllDeadlines(t, r.client, tc.deadlines) err := r.Requeue(tc.target)