diff --git a/inspector.go b/inspector.go index 8c9a263..5e81940 100644 --- a/inspector.go +++ b/inspector.go @@ -449,6 +449,16 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { return int(n), err } +// DeleteAllCompletedTasks deletes all completed tasks from the specified queue, +// and reports the number tasks deleted. +func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { + if err := base.ValidateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.DeleteAllCompletedTasks(qname) + return int(n), err +} + // DeleteTask deletes a task with the given id from the given queue. // The task needs to be in pending, scheduled, retry, or archived state, // otherwise DeleteTask will return an error. diff --git a/inspector_test.go b/inspector_test.go index a054ac8..d933898 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -1394,6 +1394,72 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { } } +func TestInspectorDeleteAllCompletedTasks(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + m1 := newCompletedTaskMessage("task1", "default", 30*time.Minute, now.Add(-2*time.Minute)) + m2 := newCompletedTaskMessage("task2", "default", 30*time.Minute, now.Add(-5*time.Minute)) + m3 := newCompletedTaskMessage("task3", "default", 30*time.Minute, now.Add(-10*time.Minute)) + m4 := newCompletedTaskMessage("task4", "custom", 30*time.Minute, now.Add(-3*time.Minute)) + z1 := base.Z{Message: m1, Score: m1.CompletedAt + m1.ResultTTL} + z2 := base.Z{Message: m2, Score: m2.CompletedAt + m2.ResultTTL} + z3 := base.Z{Message: m3, Score: m3.CompletedAt + m3.ResultTTL} + z4 := base.Z{Message: m4, Score: m4.CompletedAt + m4.ResultTTL} + + inspector := NewInspector(getRedisConnOpt(t)) + + tests := []struct { + completed map[string][]base.Z + qname string + want int + wantCompleted map[string][]base.Z + }{ + { + completed: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", + want: 3, + wantCompleted: map[string][]base.Z{ + "default": {}, + "custom": {z4}, + }, + }, + { + completed: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantCompleted: map[string][]base.Z{ + "default": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllCompletedQueues(t, r, tc.completed) + + got, err := inspector.DeleteAllCompletedTasks(tc.qname) + if err != nil { + t.Errorf("DeleteAllCompletedTasks(%q) returned error: %v", tc.qname, err) + continue + } + if got != tc.want { + t.Errorf("DeleteAllCompletedTasks(%q) = %d, want %d", tc.qname, got, tc.want) + } + for qname, want := range tc.wantCompleted { + gotCompleted := h.GetCompletedEntries(t, r, qname) + if diff := cmp.Diff(want, gotCompleted, h.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected completed tasks in queue %q: (-want, +got)\n%s", qname, diff) + } + } + } +} + func TestInspectorArchiveAllPendingTasks(t *testing.T) { r := setup(t) defer r.Close() diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 5c620db..3beffc0 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1174,6 +1174,20 @@ func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { return n, nil } +// DeleteAllCompletedTasks deletes all completed tasks from the given queue +// and returns the number of tasks deleted. +func (r *RDB) DeleteAllCompletedTasks(qname string) (int64, error) { + var op errors.Op = "rdb.DeleteAllCompletedTasks" + n, err := r.deleteAll(base.CompletedKey(qname), qname) + if errors.IsQueueNotFound(err) { + return 0, errors.E(op, errors.NotFound, err) + } + if err != nil { + return 0, errors.E(op, errors.Unknown, err) + } + return n, nil +} + // deleteAllCmd deletes tasks from the given zset. // // Input: diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index df864e2..4281a89 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2058,13 +2058,13 @@ func TestRunAllArchivedTasks(t *testing.T) { got, err := r.RunAllArchivedTasks(tc.qname) if err != nil { - t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllArchivedTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) continue } if got != tc.want { - t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllArchivedTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) } @@ -3463,10 +3463,10 @@ func TestDeleteAllArchivedTasks(t *testing.T) { got, err := r.DeleteAllArchivedTasks(tc.qname) if err != nil { - t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("r.DeleteAllArchivedTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { - t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) + t.Errorf("r.DeleteAllArchivedTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } for qname, want := range tc.wantArchived { gotArchived := h.GetArchivedMessages(t, r.client, qname) @@ -3477,6 +3477,76 @@ func TestDeleteAllArchivedTasks(t *testing.T) { } } +func newCompletedTaskMessage(qname, typename string, resultTTL time.Duration, completedAt time.Time) *base.TaskMessage { + msg := h.NewTaskMessageWithQueue(typename, nil, qname) + msg.ResultTTL = int64(resultTTL.Seconds()) + msg.CompletedAt = completedAt.Unix() + return msg +} + +func TestDeleteAllCompletedTasks(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + m1 := newCompletedTaskMessage("default", "task1", 30*time.Minute, now.Add(-2*time.Minute)) + m2 := newCompletedTaskMessage("default", "task2", 30*time.Minute, now.Add(-5*time.Minute)) + m3 := newCompletedTaskMessage("custom", "task3", 30*time.Minute, now.Add(-5*time.Minute)) + + tests := []struct { + completed map[string][]base.Z + qname string + want int64 + wantCompleted map[string][]*base.TaskMessage + }{ + { + completed: map[string][]base.Z{ + "default": { + {Message: m1, Score: m1.CompletedAt + m1.ResultTTL}, + {Message: m2, Score: m2.CompletedAt + m2.ResultTTL}, + }, + "custom": { + {Message: m3, Score: m2.CompletedAt + m3.ResultTTL}, + }, + }, + qname: "default", + want: 2, + wantCompleted: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m3}, + }, + }, + { + completed: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantCompleted: map[string][]*base.TaskMessage{ + "default": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllCompletedQueues(t, r.client, tc.completed) + + got, err := r.DeleteAllCompletedTasks(tc.qname) + if err != nil { + t.Errorf("r.DeleteAllCompletedTasks(%q) returned error: %v", tc.qname, err) + } + if got != tc.want { + t.Errorf("r.DeleteAllCompletedTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) + } + for qname, want := range tc.wantCompleted { + gotCompleted := h.GetCompletedMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotCompleted, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.CompletedKey(qname), diff) + } + } + } +} + func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) { r := setup(t) defer r.Close() @@ -3530,10 +3600,10 @@ func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) { got, err := r.DeleteAllArchivedTasks(tc.qname) if err != nil { - t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("r.DeleteAllArchivedTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { - t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) + t.Errorf("r.DeleteAllArchivedTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } for qname, want := range tc.wantArchived { gotArchived := h.GetArchivedMessages(t, r.client, qname)