2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 10:56:12 +08:00

(cli): Extend task ls command to list aggregating tasks

This commit is contained in:
Ken Hibino
2022-04-09 15:47:33 -07:00
parent af40b3fef7
commit 2dcfeeb0a5

View File

@@ -22,6 +22,7 @@ func init() {
taskListCmd.Flags().StringP("state", "s", "", "state of the tasks to inspect")
taskListCmd.Flags().Int("page", 1, "page number")
taskListCmd.Flags().Int("size", 30, "page size")
taskListCmd.Flags().StringP("group", "g", "", "group to inspect (required for listing aggregating tasks)")
taskListCmd.MarkFlagRequired("queue")
taskListCmd.MarkFlagRequired("state")
@@ -83,6 +84,7 @@ var taskListCmd = &cobra.Command{
The value for the state flag should be one of:
- active
- pending
- aggregating
- scheduled
- retry
- archived
@@ -92,12 +94,19 @@ List opeartion paginates the result set.
By default, the command fetches the first 30 tasks.
Use --page and --size flags to specify the page number and size.
Example:
To list pending tasks from "default" queue, run
asynq task ls --queue=default --state=pending
To list the tasks from the second page, run
asynq task ls --queue=default --state=pending --page=1`,
asynq task ls --queue=default --state=pending --page=1
For aggregating tasks, additional --group flag is required.
Example:
asynq task ls --queue=default --state=aggregating --group=mygroup
`,
Run: taskList,
}
@@ -192,6 +201,17 @@ func taskList(cmd *cobra.Command, args []string) {
listArchivedTasks(qname, pageNum, pageSize)
case "completed":
listCompletedTasks(qname, pageNum, pageSize)
case "aggregating":
group, err := cmd.Flags().GetString("group")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if group == "" {
fmt.Println("Flag --group is required for listing aggregating tasks")
os.Exit(1)
}
listAggregatingTasks(qname, group, pageNum, pageSize)
default:
fmt.Printf("error: state=%q is not supported\n", state)
os.Exit(1)
@@ -334,6 +354,27 @@ func listCompletedTasks(qname string, pageNum, pageSize int) {
})
}
func listAggregatingTasks(qname, group string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No aggregating tasks in group %q \n", group)
return
}
printTable(
[]string{"ID", "Type", "Payload", "Group"},
func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), t.Group)
}
},
)
}
func taskCancel(cmd *cobra.Command, args []string) {
i := createInspector()
for _, id := range args {