diff --git a/inspeq/inspector.go b/inspeq/inspector.go index 5deb054..f5deacb 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -543,27 +543,25 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { } // DeleteTaskByKey deletes a task with the given key from the given queue. -// TODO: We don't need score any more. Update this to delete task by ID -func (i *Inspector) DeleteTaskByKey(qname, key string) error { +func (i *Inspector) DeleteTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { - return err + return fmt.Errorf("asynq: %v", err) } - prefix, id, _, err := parseTaskKey(key) + taskid, err := uuid.Parse(id) if err != nil { - return err + return fmt.Errorf("asynq: %s is not a valid task id", id) } - switch prefix { - case keyPrefixPending: - return i.rdb.DeleteTask(qname, id) - case keyPrefixScheduled: - return i.rdb.DeleteTask(qname, id) - case keyPrefixRetry: - return i.rdb.DeleteTask(qname, id) - case keyPrefixArchived: - return i.rdb.DeleteTask(qname, id) - default: - return fmt.Errorf("invalid key") + err = i.rdb.DeleteTask(qname, taskid) + switch { + case errors.IsQueueNotFound(err): + return fmt.Errorf("asynq: %w", ErrQueueNotFound) + case errors.IsTaskNotFound(err): + return fmt.Errorf("asynq: %w", ErrTaskNotFound) + case err != nil: + return fmt.Errorf("asynq: %v", err) } + return nil + } // RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 351969a..1b3800a 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -1903,7 +1903,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1914,7 +1914,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { tests := []struct { pending map[string][]*base.TaskMessage qname string - key string + id string wantPending map[string][]*base.TaskMessage }{ { @@ -1923,7 +1923,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "default", - key: createPendingTask(m2).Key(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -1935,7 +1935,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "custom", - key: createPendingTask(m3).Key(), + id: createPendingTask(m3).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, @@ -1947,9 +1947,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", - tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } @@ -1964,7 +1963,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -1980,7 +1979,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - key string + id string wantScheduled map[string][]base.Z }{ { @@ -1989,7 +1988,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2001,8 +2000,8 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllScheduledQueues(t, r, tc.scheduled) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) } for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r, qname) @@ -2014,7 +2013,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2030,7 +2029,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - key string + id string wantRetry map[string][]base.Z }{ { @@ -2039,7 +2038,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2051,8 +2050,8 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllRetryQueues(t, r, tc.retry) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantRetry { @@ -2064,7 +2063,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { +func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2080,7 +2079,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - key string + id string wantArchived map[string][]base.Z }{ { @@ -2089,7 +2088,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createArchivedTask(z2).Key(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2101,8 +2100,73 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) + continue + } + for qname, want := range tc.wantArchived { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) + } + } + } +} + +func TestInspectorDeleteTaskError(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + now := time.Now() + z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} + z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} + z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} + + inspector := New(getRedisConnOpt(t)) + + tests := []struct { + archived map[string][]base.Z + qname string + id string + wantErr error + wantArchived map[string][]base.Z + }{ + { + archived: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + qname: "nonexistent", + id: createArchivedTask(z2).ID, + wantErr: ErrQueueNotFound, + wantArchived: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + }, + { + archived: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + qname: "default", + id: uuid.NewString(), + wantErr: ErrTaskNotFound, + wantArchived: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) + + if err := inspector.DeleteTask(tc.qname, tc.id); !errors.Is(err, tc.wantErr) { + t.Errorf("DeleteTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr) continue } for qname, want := range tc.wantArchived {