diff --git a/CHANGELOG.md b/CHANGELOG.md index 606db78..f787b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Fixed restoring unfinished tasks back to correct queues. + ### Changed - `asynqmon ls` command is now paginated (default 30 tasks from first page) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 31e3bda..a41a957 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -273,13 +273,16 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { // and reports the number of tasks restored. func (r *RDB) RestoreUnfinished() (int64, error) { script := redis.NewScript(` - local len = redis.call("LLEN", KEYS[1]) - for i = len, 1, -1 do - redis.call("RPOPLPUSH", KEYS[1], KEYS[2]) + local msgs = redis.call("LRANGE", KEYS[1], 0, -1) + for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + local qkey = ARGV[1] .. decoded["Queue"] + redis.call("LREM", KEYS[1], 0, msg) + redis.call("RPUSH", qkey, msg) end - return len + return table.getn(msgs) `) - res, err := script.Run(r.client, []string{base.InProgressQueue, base.DefaultQueue}).Result() + res, err := script.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result() if err != nil { return 0, err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 19c49b6..9aee6e0 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -242,7 +242,7 @@ func TestRequeue(t *testing.T) { enqueued map[string][]*base.TaskMessage // initial state of queues inProgress []*base.TaskMessage // initial state of the in-progress list target *base.TaskMessage // task to requeue - wantEnqueued map[string][]*base.TaskMessage // final state of queues + wantEnqueued map[string][]*base.TaskMessage // final state of queues wantInProgress []*base.TaskMessage // final state of the in-progress list }{ { @@ -544,41 +544,72 @@ func TestRestoreUnfinished(t *testing.T) { t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("export_csv", nil) t3 := h.NewTaskMessage("sync_stuff", nil) + t4 := h.NewTaskMessageWithQueue("important", nil, "critical") + t5 := h.NewTaskMessageWithQueue("minor", nil, "low") tests := []struct { inProgress []*base.TaskMessage - enqueued []*base.TaskMessage + enqueued map[string][]*base.TaskMessage want int64 wantInProgress []*base.TaskMessage - wantEnqueued []*base.TaskMessage + wantEnqueued map[string][]*base.TaskMessage }{ { - inProgress: []*base.TaskMessage{t1, t2, t3}, - enqueued: []*base.TaskMessage{}, + inProgress: []*base.TaskMessage{t1, t2, t3}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {}, + }, want: 3, wantInProgress: []*base.TaskMessage{}, - wantEnqueued: []*base.TaskMessage{t1, t2, t3}, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2, t3}, + }, }, { - inProgress: []*base.TaskMessage{}, - enqueued: []*base.TaskMessage{t1, t2, t3}, + inProgress: []*base.TaskMessage{}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2, t3}, + }, want: 0, wantInProgress: []*base.TaskMessage{}, - wantEnqueued: []*base.TaskMessage{t1, t2, t3}, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2, t3}, + }, }, { - inProgress: []*base.TaskMessage{t2, t3}, - enqueued: []*base.TaskMessage{t1}, + inProgress: []*base.TaskMessage{t2, t3}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + }, want: 2, wantInProgress: []*base.TaskMessage{}, - wantEnqueued: []*base.TaskMessage{t1, t2, t3}, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2, t3}, + }, + }, + { + inProgress: []*base.TaskMessage{t2, t3, t4, t5}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1}, + "critical": {}, + "low": {}, + }, + want: 4, + wantInProgress: []*base.TaskMessage{}, + wantEnqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {t1, t2, t3}, + "critical": {t4}, + "low": {t5}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedEnqueuedQueue(t, r.client, tc.enqueued) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } got, err := r.RestoreUnfinished() if got != tc.want || err != nil { @@ -591,9 +622,11 @@ func TestRestoreUnfinished(t *testing.T) { t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) } - gotEnqueued := h.GetEnqueuedMessages(t, r.client) - if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff) + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.QueueKey(qname), diff) + } } } }