diff --git a/CHANGELOG.md b/CHANGELOG.md index 412bedd..e8ac415 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script) - `Client.Enqueue` now returns `TaskInfo` -- Renamed pending key (TODO: need migration script +- `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask` +- `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask` +- `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask` ## [0.17.2] - 2021-06-06 diff --git a/inspeq/inspector.go b/inspeq/inspector.go index f5deacb..bb2c8ee 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -542,7 +542,13 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { return int(n), err } -// DeleteTaskByKey deletes a task with the given key from the given queue. +// DeleteTask deletes a task with the given id from the given queue. +// The task needs to be in pending, scheduled, retry, or archived state, +// otherwise DeleteTask will return an error. +// +// If a queue with the given name doesn't exist, it returns ErrQueueNotFound. +// If a task with the given id doesn't exist in the queue, it returns ErrTaskNotFound. +// If the task is in active state, it returns a non-nil error. func (i *Inspector) DeleteTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { return fmt.Errorf("asynq: %v", err) @@ -651,28 +657,31 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { return int(n), err } -// ArchiveTaskByKey archives a task with the given key in the given queue. -// TODO: Update this to Archive task by ID. -func (i *Inspector) ArchiveTaskByKey(qname, key string) error { +// ArchiveTask archives a task with the given id in the given queue. +// The task needs to be in pending, scheduled, or retry state, otherwise ArchiveTask +// will return an error. +// +// If a queue with the given name doesn't exist, it returns ErrQueueNotFound. +// If a task with the given id doesn't exist in the queue, it returns ErrTaskNotFound. +// If the task is in already archived, it returns a non-nil error. +func (i *Inspector) ArchiveTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { - return err + return fmt.Errorf("asynq: err") } - prefix, id, _, err := parseTaskKey(key) + taskid, err := uuid.Parse(id) if err != nil { - return err + return fmt.Errorf("asynq: %s is not a valid task id", id) } - switch prefix { - case keyPrefixPending: - return i.rdb.ArchiveTask(qname, id) - case keyPrefixScheduled: - return i.rdb.ArchiveTask(qname, id) - case keyPrefixRetry: - return i.rdb.ArchiveTask(qname, id) - case keyPrefixArchived: - return fmt.Errorf("task is already archived") - default: - return fmt.Errorf("invalid key") + err = i.rdb.ArchiveTask(qname, taskid) + switch { + case errors.IsQueueNotFound(err): + return fmt.Errorf("asynq: %w", ErrQueueNotFound) + case errors.IsTaskNotFound(err): + return fmt.Errorf("asynq: %w", ErrTaskNotFound) + case err != nil: + return fmt.Errorf("asynq: %v", err) } + return nil } // CancelActiveTask sends a signal to cancel processing of the task with diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 1b3800a..06a364b 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -2490,7 +2490,7 @@ func TestInspectorRunTaskError(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2503,7 +2503,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { pending map[string][]*base.TaskMessage archived map[string][]base.Z qname string - key string + id string wantPending map[string][]*base.TaskMessage wantArchived map[string][]base.Z }{ @@ -2517,7 +2517,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "default", - key: createPendingTask(m1).Key(), + id: createPendingTask(m1).ID, wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2, m3}, @@ -2539,7 +2539,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createPendingTask(m2).Key(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2558,9 +2558,8 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { h.SeedAllPendingQueues(t, r, tc.pending) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", - tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantPending { @@ -2581,7 +2580,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2598,7 +2597,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { scheduled map[string][]base.Z archived map[string][]base.Z qname string - key string + id string want string wantScheduled map[string][]base.Z wantArchived map[string][]base.Z @@ -2613,7 +2612,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2635,8 +2634,8 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantScheduled { @@ -2657,7 +2656,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2674,7 +2673,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { retry map[string][]base.Z archived map[string][]base.Z qname string - key string + id string wantRetry map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -2688,7 +2687,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2710,8 +2709,100 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err) + continue + } + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", + qname, diff) + } + } + for qname, want := range tc.wantArchived { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", + qname, diff) + } + } + } +} + +func TestInspectorArchiveTaskError(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + now := time.Now() + z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} + z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} + z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + + inspector := New(getRedisConnOpt(t)) + + tests := []struct { + retry map[string][]base.Z + archived map[string][]base.Z + qname string + id string + wantErr error + wantRetry map[string][]base.Z + wantArchived map[string][]base.Z + }{ + { + retry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "nonexistent", + id: createRetryTask(z2).ID, + wantErr: ErrQueueNotFound, + wantRetry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + }, + { + retry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + id: uuid.NewString(), + wantErr: ErrTaskNotFound, + wantRetry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllArchivedQueues(t, r, tc.archived) + + if err := inspector.ArchiveTask(tc.qname, tc.id); !errors.Is(err, tc.wantErr) { + t.Errorf("ArchiveTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr) continue } for qname, want := range tc.wantRetry {