2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

(cli): Extend task ls command to list aggregating tasks

This commit is contained in:
Ken Hibino 2022-04-09 15:47:33 -07:00
parent 2c783566f3
commit 578321f226

View File

@ -22,6 +22,7 @@ func init() {
taskListCmd.Flags().StringP("state", "s", "", "state of the tasks to inspect") taskListCmd.Flags().StringP("state", "s", "", "state of the tasks to inspect")
taskListCmd.Flags().Int("page", 1, "page number") taskListCmd.Flags().Int("page", 1, "page number")
taskListCmd.Flags().Int("size", 30, "page size") taskListCmd.Flags().Int("size", 30, "page size")
taskListCmd.Flags().StringP("group", "g", "", "group to inspect (required for listing aggregating tasks)")
taskListCmd.MarkFlagRequired("queue") taskListCmd.MarkFlagRequired("queue")
taskListCmd.MarkFlagRequired("state") taskListCmd.MarkFlagRequired("state")
@ -83,6 +84,7 @@ var taskListCmd = &cobra.Command{
The value for the state flag should be one of: The value for the state flag should be one of:
- active - active
- pending - pending
- aggregating
- scheduled - scheduled
- retry - retry
- archived - archived
@ -92,12 +94,19 @@ List opeartion paginates the result set.
By default, the command fetches the first 30 tasks. By default, the command fetches the first 30 tasks.
Use --page and --size flags to specify the page number and size. Use --page and --size flags to specify the page number and size.
Example: Example:
To list pending tasks from "default" queue, run To list pending tasks from "default" queue, run
asynq task ls --queue=default --state=pending asynq task ls --queue=default --state=pending
To list the tasks from the second page, run To list the tasks from the second page, run
asynq task ls --queue=default --state=pending --page=1`, asynq task ls --queue=default --state=pending --page=1
For aggregating tasks, additional --group flag is required.
Example:
asynq task ls --queue=default --state=aggregating --group=mygroup
`,
Run: taskList, Run: taskList,
} }
@ -192,6 +201,17 @@ func taskList(cmd *cobra.Command, args []string) {
listArchivedTasks(qname, pageNum, pageSize) listArchivedTasks(qname, pageNum, pageSize)
case "completed": case "completed":
listCompletedTasks(qname, pageNum, pageSize) listCompletedTasks(qname, pageNum, pageSize)
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if group == "" {
fmt.Println("Flag --group is required for listing aggregating tasks")
os.Exit(1)
}
listAggregatingTasks(qname, group, pageNum, pageSize)
default: default:
fmt.Printf("error: state=%q is not supported\n", state) fmt.Printf("error: state=%q is not supported\n", state)
os.Exit(1) os.Exit(1)
@ -334,6 +354,27 @@ func listCompletedTasks(qname string, pageNum, pageSize int) {
}) })
} }
func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No aggregating tasks in group %q \n", group)
return
}
printTable(
[]string{"ID", "Type", "Payload", "Group"},
func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), t.Group)
}
},
)
}
func taskCancel(cmd *cobra.Command, args []string) { func taskCancel(cmd *cobra.Command, args []string) {
i := createInspector() i := createInspector()
for _, id := range args { for _, id := range args {