mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 10:56:12 +08:00 
			
		
		
		
	Add batch actions to inspector for aggregating tasks
Added: - Inspector.DeleteAllAggregatingTasks - Inspector.ArchiveAllAggregatingTasks - Inspector.RunAllAggregatingTasks
This commit is contained in:
		
							
								
								
									
										42
									
								
								inspector.go
									
									
									
									
									
								
							
							
						
						
									
										42
									
								
								inspector.go
									
									
									
									
									
								
							| @@ -558,10 +558,14 @@ func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // TODO: comment | ||||
| // DeleteAllAggregatingTasks deletes all tasks from the specified group, | ||||
| // and reports the number of tasks deleted. | ||||
| func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) { | ||||
| 	// TODO: implement this | ||||
| 	return 0, nil | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname) | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // DeleteTask deletes a task with the given id from the given queue. | ||||
| @@ -588,8 +592,8 @@ func (i *Inspector) DeleteTask(qname, id string) error { | ||||
|  | ||||
| } | ||||
|  | ||||
| // RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, | ||||
| // and reports the number of tasks transitioned. | ||||
| // RunAllScheduledTasks schedules all scheduled tasks from the given queue to run, | ||||
| // and reports the number of tasks scheduled to run. | ||||
| func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| @@ -598,8 +602,8 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // RunAllRetryTasks transition all retry tasks to pending state from the given queue, | ||||
| // and reports the number of tasks transitioned. | ||||
| // RunAllRetryTasks schedules all retry tasks from the given queue to run, | ||||
| // and reports the number of tasks scheduled to run. | ||||
| func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| @@ -608,8 +612,8 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // RunAllArchivedTasks transition all archived tasks to pending state from the given queue, | ||||
| // and reports the number of tasks transitioned. | ||||
| // RunAllArchivedTasks schedules all archived tasks from the given queue to run, | ||||
| // and reports the number of tasks scheduled to run. | ||||
| func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| @@ -618,6 +622,16 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // RunAllAggregatingTasks schedules all tasks from the given grou to run. | ||||
| // and reports the number of tasks scheduled to run. | ||||
| func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) { | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	n, err := i.rdb.RunAllAggregatingTasks(qname, gname) | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // RunTask updates the task to pending state given a queue name and task id. | ||||
| // The task needs to be in scheduled, retry, or archived state, otherwise RunTask | ||||
| // will return an error. | ||||
| @@ -671,10 +685,14 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // TODO: comment | ||||
| // ArchiveAllAggregatingTasks archives all tasks from the given group, | ||||
| // and reports the number of tasks archived. | ||||
| func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) { | ||||
| 	// TODO: implement this | ||||
| 	return 0, nil | ||||
| 	if err := base.ValidateQueueName(qname); err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname) | ||||
| 	return int(n), err | ||||
| } | ||||
|  | ||||
| // ArchiveTask archives a task with the given id in the given queue. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user