diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index fc8b63f..5f9ede1 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -140,31 +140,31 @@ func queueInspect(cmd *cobra.Command, args []string) { } } -func printQueueInfo(s *asynq.QueueInfo) { +func printQueueInfo(info *asynq.QueueInfo) { bold := color.New(color.Bold) bold.Println("Queue Info") - fmt.Printf("Name: %s\n", s.Queue) - fmt.Printf("Size: %d\n", s.Size) - fmt.Printf("Paused: %t\n\n", s.Paused) + fmt.Printf("Name: %s\n", info.Queue) + fmt.Printf("Size: %d\n", info.Size) + fmt.Printf("Paused: %t\n\n", info.Paused) bold.Println("Task Count by State") printTable( []string{"active", "pending", "scheduled", "retry", "archived"}, func(w io.Writer, tmpl string) { - fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Archived) + fmt.Fprintf(w, tmpl, info.Active, info.Pending, info.Scheduled, info.Retry, info.Archived) }, ) fmt.Println() - bold.Printf("Daily Stats %s UTC\n", s.Timestamp.UTC().Format("2006-01-02")) + bold.Printf("Daily Stats %s UTC\n", info.Timestamp.UTC().Format("2006-01-02")) printTable( []string{"processed", "failed", "error rate"}, func(w io.Writer, tmpl string) { var errRate string - if s.Processed == 0 { + if info.Processed == 0 { errRate = "N/A" } else { - errRate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) + errRate = fmt.Sprintf("%.2f%%", float64(info.Failed)/float64(info.Processed)*100) } - fmt.Fprintf(w, tmpl, s.Processed, s.Failed, errRate) + fmt.Fprintf(w, tmpl, info.Processed, info.Failed, errRate) }, ) } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 5ad056c..ffb88bc 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -35,19 +35,19 @@ func init() { taskCmd.AddCommand(taskArchiveCmd) taskArchiveCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskArchiveCmd.Flags().StringP("id", "t", "", "id of the task") + taskArchiveCmd.Flags().StringP("id", "i", "", "id of the task") taskArchiveCmd.MarkFlagRequired("queue") taskArchiveCmd.MarkFlagRequired("id") taskCmd.AddCommand(taskDeleteCmd) taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskDeleteCmd.Flags().StringP("id", "t", "", "id of the task") + taskDeleteCmd.Flags().StringP("id", "i", "", "id of the task") taskDeleteCmd.MarkFlagRequired("queue") taskDeleteCmd.MarkFlagRequired("id") taskCmd.AddCommand(taskRunCmd) taskRunCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskRunCmd.Flags().StringP("id", "t", "", "id of the task") + taskRunCmd.Flags().StringP("id", "i", "", "id of the task") taskRunCmd.MarkFlagRequired("queue") taskRunCmd.MarkFlagRequired("id") @@ -136,21 +136,21 @@ var taskRunCmd = &cobra.Command{ } var taskArchiveAllCmd = &cobra.Command{ - Use: "archive-all --queue=QUEUE --state=STATE", + Use: "archiveall --queue=QUEUE --state=STATE", Short: "Archive all tasks in the given state", Args: cobra.NoArgs, Run: taskArchiveAll, } var taskDeleteAllCmd = &cobra.Command{ - Use: "delete-all --queue=QUEUE --state=STATE", + Use: "deleteall --queue=QUEUE --state=STATE", Short: "Delete all tasks in the given state", Args: cobra.NoArgs, Run: taskDeleteAll, } var taskRunAllCmd = &cobra.Command{ - Use: "run-all --queue=QUEUE --state=STATE", + Use: "runall --queue=QUEUE --state=STATE", Short: "Run all tasks in the given state", Args: cobra.NoArgs, Run: taskRunAll, @@ -252,14 +252,23 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Process In"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - processIn := fmt.Sprintf("%.0f seconds", - t.NextProcessAt().Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), processIn) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt())) } }, ) } +// formatProcessAt formats next process at time to human friendly string. +// If processAt time is in the past, returns "right now". +// If processAt time is in the future, returns "in xxx" where xxx is the duration from now. +func formatProcessAt(processAt time.Time) string { + d := processAt.Sub(time.Now()) + if d < 0 { + return "right now" + } + return fmt.Sprintf("in %v", d.Round(time.Second)) +} + func listRetryTasks(qname string, pageNum, pageSize int) { i := createInspector() tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) @@ -275,13 +284,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - var nextRetry string - if d := t.NextProcessAt().Sub(time.Now()); d > 0 { - nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) - } else { - nextRetry = "right now" - } - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), nextRetry, t.LastErr(), t.Retried(), t.MaxRetry()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt()), t.LastErr(), t.Retried(), t.MaxRetry()) } }, )