diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b66e86a..fc5d655 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -5,7 +5,6 @@ package rdb import ( - "errors" "fmt" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/google/uuid" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" ) func TestAllQueues(t *testing.T) { @@ -1090,8 +1090,8 @@ func TestRunArchivedTask(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + gotArchived := h.GetArchivedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } @@ -1610,8 +1610,8 @@ func TestRunAllArchivedTasks(t *testing.T) { } } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + gotArchived := h.GetArchivedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" { t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff) } } @@ -1635,7 +1635,6 @@ func TestArchiveRetryTask(t *testing.T) { archived map[string][]base.Z qname string id uuid.UUID - want error wantRetry map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -1651,7 +1650,6 @@ func TestArchiveRetryTask(t *testing.T) { }, qname: "default", id: m1.ID, - want: nil, wantRetry: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, @@ -1659,23 +1657,6 @@ func TestArchiveRetryTask(t *testing.T) { "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, - { - retry: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - archived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - qname: "default", - id: uuid.New(), - want: ErrTaskNotFound, - wantRetry: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - }, { retry: map[string][]base.Z{ "default": { @@ -1693,7 +1674,6 @@ func TestArchiveRetryTask(t *testing.T) { }, qname: "custom", id: m3.ID, - want: nil, wantRetry: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, @@ -1715,10 +1695,9 @@ func TestArchiveRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.ArchiveTask(tc.qname, tc.id) - if !errors.Is(got, tc.want) { - t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", - tc.qname, tc.id, got, tc.want) + if got := r.ArchiveTask(tc.qname, tc.id); got != nil { + t.Errorf("(*RDB).ArchiveTask(%q, %v) returned error: %v", + tc.qname, tc.id, got) continue } @@ -1731,8 +1710,8 @@ func TestArchiveRetryTask(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -1757,7 +1736,6 @@ func TestArchiveScheduledTask(t *testing.T) { archived map[string][]base.Z qname string id uuid.UUID - want error wantScheduled map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -1773,7 +1751,6 @@ func TestArchiveScheduledTask(t *testing.T) { }, qname: "default", id: m1.ID, - want: nil, wantScheduled: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, @@ -1781,40 +1758,6 @@ func TestArchiveScheduledTask(t *testing.T) { "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, - { - scheduled: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - archived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - qname: "nonexistent", - id: m2.ID, - want: ErrQueueNotFound, - wantScheduled: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - }, - { - scheduled: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - archived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - qname: "default", - id: m2.ID, - want: ErrTaskAlreadyArchived, - wantScheduled: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, - }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, - }, - }, { scheduled: map[string][]base.Z{ "default": { @@ -1832,7 +1775,6 @@ func TestArchiveScheduledTask(t *testing.T) { }, qname: "custom", id: m3.ID, - want: nil, wantScheduled: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, @@ -1854,10 +1796,9 @@ func TestArchiveScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.ArchiveTask(tc.qname, tc.id) - if !errors.Is(got, tc.want) { - t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", - tc.qname, tc.id, got, tc.want) + if got := r.ArchiveTask(tc.qname, tc.id); got != nil { + t.Errorf("(*RDB).ArchiveTask(%q, %v) returned error: %v", + tc.qname, tc.id, got) continue } @@ -1870,8 +1811,8 @@ func TestArchiveScheduledTask(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -1887,14 +1828,11 @@ func TestArchivePendingTask(t *testing.T) { m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") - oneHourAgo := time.Now().Add(-1 * time.Hour) - tests := []struct { pending map[string][]*base.TaskMessage archived map[string][]base.Z qname string id uuid.UUID - want error wantPending map[string][]*base.TaskMessage wantArchived map[string][]base.Z }{ @@ -1907,7 +1845,6 @@ func TestArchivePendingTask(t *testing.T) { }, qname: "default", id: m1.ID, - want: nil, wantPending: map[string][]*base.TaskMessage{ "default": {m2}, }, @@ -1915,23 +1852,6 @@ func TestArchivePendingTask(t *testing.T) { "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, - { - pending: map[string][]*base.TaskMessage{ - "default": {m1}, - }, - archived: map[string][]base.Z{ - "default": {{Message: m2, Score: oneHourAgo.Unix()}}, - }, - qname: "default", - id: uuid.New(), - want: ErrTaskNotFound, - wantPending: map[string][]*base.TaskMessage{ - "default": {m1}, - }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m2, Score: oneHourAgo.Unix()}}, - }, - }, { pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, @@ -1943,7 +1863,6 @@ func TestArchivePendingTask(t *testing.T) { }, qname: "custom", id: m3.ID, - want: nil, wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m4}, @@ -1960,10 +1879,9 @@ func TestArchivePendingTask(t *testing.T) { h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.ArchiveTask(tc.qname, tc.id) - if !errors.Is(got, tc.want) { - t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", - tc.qname, tc.id, got, tc.want) + if got := r.ArchiveTask(tc.qname, tc.id); got != nil { + t.Errorf("(*RDB).ArchiveTask(%q, %v) returned error: %v", + tc.qname, tc.id, got) continue } @@ -1976,8 +1894,111 @@ func TestArchivePendingTask(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ArchivedKey(qname), diff) + } + } + } +} + +func TestArchiveTaskError(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + t1 := time.Now().Add(1 * time.Minute) + t2 := time.Now().Add(1 * time.Hour) + + tests := []struct { + desc string + scheduled map[string][]base.Z + archived map[string][]base.Z + qname string + id uuid.UUID + match func(err error) bool + wantScheduled map[string][]base.Z + wantArchived map[string][]base.Z + }{ + { + desc: "It should return QueueNotFoundError if provided queue name doesn't exist", + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + qname: "nonexistent", + id: m2.ID, + match: errors.IsQueueNotFound, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, + { + desc: "It should return TaskNotFoundError if provided task ID doesn't exist in the queue", + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + qname: "default", + id: uuid.New(), + match: errors.IsTaskNotFound, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, + { + desc: "It should return TaskAlreadyArchivedError if task is already in archived state", + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + qname: "default", + id: m2.ID, + match: errors.IsTaskAlreadyArchived, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllArchivedQueues(t, r.client, tc.archived) + + got := r.ArchiveTask(tc.qname, tc.id) + if !tc.match(got) { + t.Errorf("%s: returned error didn't match: got=%v", tc.desc, got) + continue + } + + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ScheduledKey(qname), diff) + } + } + + for qname, want := range tc.wantArchived { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2108,8 +2129,8 @@ func TestArchiveAllPendingTasks(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2254,8 +2275,8 @@ func TestArchiveAllRetryTasks(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2401,8 +2422,8 @@ func TestArchiveAllScheduledTasks(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2497,8 +2518,8 @@ func TestDeleteArchivedTask(t *testing.T) { } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + gotArchived := h.GetArchivedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } @@ -2791,8 +2812,8 @@ func TestDeleteAllArchivedTasks(t *testing.T) { t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } for qname, want := range tc.wantArchived { - gotDead := h.GetArchivedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + gotArchived := h.GetArchivedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } @@ -3221,8 +3242,8 @@ func TestRemoveQueueError(t *testing.T) { } } for qname, want := range tc.archived { - gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ArchivedKey(qname), diff) } } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b122284..4b8f101 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -23,9 +23,6 @@ var ( // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = fmt.Errorf("%w: could not find a task in the queue", base.ErrNotFound) - // ErrTaskAlreadyArchived indicates that the task in question is already in archive state. - ErrTaskAlreadyArchived = fmt.Errorf("%w: task is already archived", base.ErrFailedPrecondition) - // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists")