diff --git a/inspector.go b/inspector.go index 1f9f7bc..4737683 100644 --- a/inspector.go +++ b/inspector.go @@ -464,6 +464,16 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch return tasks, nil } +// DeleteAllPendingTasks deletes all pending tasks from the specified queue, +// and reports the number tasks deleted. +func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.DeleteAllPendingTasks(qname) + return int(n), err +} + // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { @@ -517,7 +527,7 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { } } -// RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue, +// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { @@ -527,7 +537,7 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { return int(n), err } -// RunAllRetryTasks transition all retry tasks to pending state within the given queue, +// RunAllRetryTasks transition all retry tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { @@ -537,7 +547,7 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { return int(n), err } -// RunAllArchivedTasks transition all archived tasks to pending state within the given queue, +// RunAllArchivedTasks transition all archived tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { @@ -568,7 +578,17 @@ func (i *Inspector) RunTaskByKey(qname, key string) error { } } -// ArchiveAllScheduledTasks archives all scheduled tasks within the given queue, +// ArchiveAllPendingTasks archives all pending tasks from the given queue, +// and reports the number of tasks archived. +func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.ArchiveAllPendingTasks(qname) + return int(n), err +} + +// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, // and reports the number of tasks archiveed. func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { @@ -578,7 +598,7 @@ func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { return int(n), err } -// ArchiveAllRetryTasks archives all retry tasks within the given queue, +// ArchiveAllRetryTasks archives all retry tasks from the given queue, // and reports the number of tasks archiveed. func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { diff --git a/inspector_test.go b/inspector_test.go index 910c7ee..a3fa2b9 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -839,6 +839,70 @@ func TestInspectorListPagination(t *testing.T) { } } +func TestInspectorDeleteAllPendingTasks(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := asynqtest.NewTaskMessage("task1", nil) + m2 := asynqtest.NewTaskMessage("task2", nil) + m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + + inspector := NewInspector(getRedisConnOpt(t)) + + tests := []struct { + pending map[string][]*base.TaskMessage + qname string + want int + wantPending map[string][]*base.TaskMessage + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2, m3}, + "custom": {m4}, + }, + qname: "default", + want: 3, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m4}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2, m3}, + "custom": {m4}, + }, + qname: "custom", + want: 1, + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2, m3}, + "custom": {}, + }, + }, + } + + for _, tc := range tests { + asynqtest.FlushDB(t, r) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) + + got, err := inspector.DeleteAllPendingTasks(tc.qname) + if err != nil { + t.Errorf("DeleteAllPendingTasks(%q) returned error: %v", tc.qname, err) + continue + } + if got != tc.want { + t.Errorf("DeleteAllPendingTasks(%q) = %d, want %d", tc.qname, got, tc.want) + } + + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) + } + } + } +} + func TestInspectorDeleteAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() @@ -1037,6 +1101,122 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { } } +func TestInspectorArchiveAllPendingTasks(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := asynqtest.NewTaskMessage("task1", nil) + m2 := asynqtest.NewTaskMessage("task2", nil) + m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + now := time.Now() + z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} + z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} + inspector := NewInspector(getRedisConnOpt(t)) + + tests := []struct { + pending map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + want int + wantPending map[string][]*base.TaskMessage + wantArchived map[string][]base.Z + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2, m3}, + "custom": {m4}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "default", + want: 3, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m4}, + }, + wantArchived: map[string][]base.Z{ + "default": { + base.Z{Message: m1, Score: now.Unix()}, + base.Z{Message: m2, Score: now.Unix()}, + base.Z{Message: m3, Score: now.Unix()}, + }, + "custom": {}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {}, + }, + archived: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m3, m4}, + }, + archived: map[string][]base.Z{ + "default": {z1, z2}, + }, + qname: "default", + want: 2, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantArchived: map[string][]base.Z{ + "default": { + + z1, + z2, + base.Z{Message: m3, Score: now.Unix()}, + base.Z{Message: m4, Score: now.Unix()}, + }, + }, + }, + } + + for _, tc := range tests { + asynqtest.FlushDB(t, r) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + + got, err := inspector.ArchiveAllPendingTasks(tc.qname) + if err != nil { + t.Errorf("ArchiveAllPendingTasks(%q) returned error: %v", tc.qname, err) + continue + } + if got != tc.want { + t.Errorf("ArchiveAllPendingTasks(%q) = %d, want %d", tc.qname, got, tc.want) + } + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) + } + } + for qname, want := range tc.wantArchived { + // Allow Z.Score to differ by up to 2. + approxOpt := cmp.Comparer(func(a, b int64) bool { + return math.Abs(float64(a-b)) < 2 + }) + gotArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) + } + } + } +} + func TestInspectorArchiveAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index d9c3733..e0ff1b0 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -560,6 +560,39 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) } +// KEYS[1] -> asynq:{} +// KEYS[2] -> asynq:{}:archived +// ARGV[1] -> current timestamp +// ARGV[2] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[3] -> max number of tasks in archive (e.g., 100) +var archiveAllPendingCmd = redis.NewScript(` +local msgs = redis.call("LRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + redis.call("ZADD", KEYS[2], ARGV[1], msg) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) +end +redis.call("DEL", KEYS[1]) +return table.getn(msgs)`) + +// ArchiveAllPendingTasks archives all pending tasks from the given queue and +// returns the number of tasks that were moved. +func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { + keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)} + now := time.Now() + limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago + args := []interface{}{now.Unix(), limit, maxArchiveSize} + res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil +} + // KEYS[1] -> ZSET to move task from (e.g., retry queue) // KEYS[2] -> asynq:{}:archived // ARGV[1] -> score of the task to archive @@ -730,6 +763,26 @@ func (r *RDB) deleteAll(key string) (int64, error) { return n, nil } +// KEYS[1] -> asynq:{} +var deleteAllPendingCmd = redis.NewScript(` +local n = redis.call("LLEN", KEYS[1]) +redis.call("DEL", KEYS[1]) +return n`) + +// DeleteAllPendingTasks deletes all pending tasks from the given queue +// and returns the number of tasks deleted. +func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { + res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil +} + // ErrQueueNotFound indicates specified queue does not exist. type ErrQueueNotFound struct { qname string