diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 6c7d0a6..94dec9b 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -22,6 +22,7 @@ func init() { taskListCmd.Flags().StringP("state", "s", "", "state of the tasks to inspect") taskListCmd.Flags().Int("page", 1, "page number") taskListCmd.Flags().Int("size", 30, "page size") + taskListCmd.Flags().StringP("group", "g", "", "group to inspect (required for listing aggregating tasks)") taskListCmd.MarkFlagRequired("queue") taskListCmd.MarkFlagRequired("state") @@ -83,6 +84,7 @@ var taskListCmd = &cobra.Command{ The value for the state flag should be one of: - active - pending +- aggregating - scheduled - retry - archived @@ -92,12 +94,19 @@ List opeartion paginates the result set. By default, the command fetches the first 30 tasks. Use --page and --size flags to specify the page number and size. + Example: To list pending tasks from "default" queue, run asynq task ls --queue=default --state=pending To list the tasks from the second page, run - asynq task ls --queue=default --state=pending --page=1`, + asynq task ls --queue=default --state=pending --page=1 + +For aggregating tasks, additional --group flag is required. + +Example: + asynq task ls --queue=default --state=aggregating --group=mygroup +`, Run: taskList, } @@ -192,6 +201,17 @@ func taskList(cmd *cobra.Command, args []string) { listArchivedTasks(qname, pageNum, pageSize) case "completed": listCompletedTasks(qname, pageNum, pageSize) + case "aggregating": + group, err := cmd.Flags().GetString("group") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if group == "" { + fmt.Println("Flag --group is required for listing aggregating tasks") + os.Exit(1) + } + listAggregatingTasks(qname, group, pageNum, pageSize) default: fmt.Printf("error: state=%q is not supported\n", state) os.Exit(1) @@ -334,6 +354,27 @@ func listCompletedTasks(qname string, pageNum, pageSize int) { }) } +func listAggregatingTasks(qname, group string, pageNum, pageSize int) { + i := createInspector() + tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No aggregating tasks in group %q \n", group) + return + } + printTable( + []string{"ID", "Type", "Payload", "Group"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), t.Group) + } + }, + ) +} + func taskCancel(cmd *cobra.Command, args []string) { i := createInspector() for _, id := range args {