From a369443955fb167e0b0e392d9fa4a8b1c67dff1f Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 30 Mar 2022 05:58:38 -0700 Subject: [PATCH] Add batch actions to inspector for aggregating tasks Added: - Inspector.DeleteAllAggregatingTasks - Inspector.ArchiveAllAggregatingTasks - Inspector.RunAllAggregatingTasks --- inspector.go | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/inspector.go b/inspector.go index c48ec83..56e2d63 100644 --- a/inspector.go +++ b/inspector.go @@ -558,10 +558,14 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { return int(n), err } -// TODO: comment +// DeleteAllAggregatingTasks deletes all tasks from the specified group, +// and reports the number of tasks deleted. func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) { - // TODO: implement this - return 0, nil + if err := base.ValidateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname) + return int(n), err } // DeleteTask deletes a task with the given id from the given queue. @@ -588,8 +592,8 @@ func (i *Inspector) DeleteTask(qname, id string) error { } -// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, -// and reports the number of tasks transitioned. +// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run, +// and reports the number of tasks scheduled to run. func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { if err := base.ValidateQueueName(qname); err != nil { return 0, err @@ -598,8 +602,8 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { return int(n), err } -// RunAllRetryTasks transition all retry tasks to pending state from the given queue, -// and reports the number of tasks transitioned. +// RunAllRetryTasks schedules all retry tasks from the given queue to run, +// and reports the number of tasks scheduled to run. func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { if err := base.ValidateQueueName(qname); err != nil { return 0, err @@ -608,8 +612,8 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { return int(n), err } -// RunAllArchivedTasks transition all archived tasks to pending state from the given queue, -// and reports the number of tasks transitioned. +// RunAllArchivedTasks schedules all archived tasks from the given queue to run, +// and reports the number of tasks scheduled to run. func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { if err := base.ValidateQueueName(qname); err != nil { return 0, err @@ -618,6 +622,16 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { return int(n), err } +// RunAllAggregatingTasks schedules all tasks from the given grou to run. +// and reports the number of tasks scheduled to run. +func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) { + if err := base.ValidateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.RunAllAggregatingTasks(qname, gname) + return int(n), err +} + // RunTask updates the task to pending state given a queue name and task id. // The task needs to be in scheduled, retry, or archived state, otherwise RunTask // will return an error. @@ -671,10 +685,14 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { return int(n), err } -// TODO: comment +// ArchiveAllAggregatingTasks archives all tasks from the given group, +// and reports the number of tasks archived. func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) { - // TODO: implement this - return 0, nil + if err := base.ValidateQueueName(qname); err != nil { + return 0, err + } + n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname) + return int(n), err } // ArchiveTask archives a task with the given id in the given queue.