2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add batch actions to inspector for aggregating tasks

Added:
- Inspector.DeleteAllAggregatingTasks
- Inspector.ArchiveAllAggregatingTasks
- Inspector.RunAllAggregatingTasks
This commit is contained in:
Ken Hibino 2022-03-30 05:58:38 -07:00
parent de139cc18e
commit a369443955

View File

@ -558,10 +558,14 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) {
return int(n), err
}
// TODO: comment
// DeleteAllAggregatingTasks deletes all tasks from the specified group,
// and reports the number of tasks deleted.
func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) {
// TODO: implement this
return 0, nil
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname)
return int(n), err
}
// DeleteTask deletes a task with the given id from the given queue.
@ -588,8 +592,8 @@ func (i *Inspector) DeleteTask(qname, id string) error {
}
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
@ -598,8 +602,8 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
return int(n), err
}
// RunAllRetryTasks transition all retry tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllRetryTasks schedules all retry tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
@ -608,8 +612,8 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
return int(n), err
}
// RunAllArchivedTasks transition all archived tasks to pending state from the given queue,
// and reports the number of tasks transitioned.
// RunAllArchivedTasks schedules all archived tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
@ -618,6 +622,16 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
return int(n), err
}
// RunAllAggregatingTasks schedules all tasks from the given grou to run.
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.RunAllAggregatingTasks(qname, gname)
return int(n), err
}
// RunTask updates the task to pending state given a queue name and task id.
// The task needs to be in scheduled, retry, or archived state, otherwise RunTask
// will return an error.
@ -671,10 +685,14 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
return int(n), err
}
// TODO: comment
// ArchiveAllAggregatingTasks archives all tasks from the given group,
// and reports the number of tasks archived.
func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) {
// TODO: implement this
return 0, nil
if err := base.ValidateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname)
return int(n), err
}
// ArchiveTask archives a task with the given id in the given queue.