From 5d4f367a9ae53b0d93d81c08b159b7b0b8a53bf4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 29 Sep 2021 06:14:27 -0700 Subject: [PATCH] (cli): Update `task ls` command to support completed tasks --- tools/asynq/cmd/cron.go | 2 +- tools/asynq/cmd/root.go | 6 +++--- tools/asynq/cmd/task.go | 44 +++++++++++++++++++++++++++++++---------- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/tools/asynq/cmd/cron.go b/tools/asynq/cmd/cron.go index cb133d3..87bd749 100644 --- a/tools/asynq/cmd/cron.go +++ b/tools/asynq/cmd/cron.go @@ -63,7 +63,7 @@ func cronList(cmd *cobra.Command, args []string) { cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"} printRows := func(w io.Writer, tmpl string) { for _, e := range entries { - fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), formatPayload(e.Task.Payload()), e.Opts, + fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), sprintBytes(e.Task.Payload()), e.Opts, nextEnqueue(e.Next), prevEnqueue(e.Prev)) } } diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index f6873a2..322a19b 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -199,9 +199,9 @@ func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { tw.Flush() } -// formatPayload returns string representation of payload if data is printable. -// If data is not printable, it returns a string describing payload is not printable. -func formatPayload(payload []byte) string { +// sprintBytes returns a string representation of the given byte slice if data is printable. +// If data is not printable, it returns a string describing it is not printable. +func sprintBytes(payload []byte) string { if !isPrintable(payload) { return "non-printable bytes" } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 5944b4a..45ed6e0 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -86,6 +86,7 @@ The value for the state flag should be one of: - scheduled - retry - archived +- completed List opeartion paginates the result set. By default, the command fetches the first 30 tasks. @@ -189,6 +190,8 @@ func taskList(cmd *cobra.Command, args []string) { listRetryTasks(qname, pageNum, pageSize) case "archived": listArchivedTasks(qname, pageNum, pageSize) + case "completed": + listCompletedTasks(qname, pageNum, pageSize) default: fmt.Printf("error: state=%q is not supported\n", state) os.Exit(1) @@ -210,7 +213,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload)) + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload)) } }, ) @@ -231,7 +234,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload)) + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload)) } }, ) @@ -252,7 +255,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Process In"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatProcessAt(t.NextProcessAt)) + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatProcessAt(t.NextProcessAt)) } }, ) @@ -284,8 +287,8 @@ func listRetryTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatProcessAt(t.NextProcessAt), - t.LastErr, formatLastFailedAt(t.LastFailedAt), t.Retried, t.MaxRetry) + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatProcessAt(t.NextProcessAt), + t.LastErr, formatPastTime(t.LastFailedAt), t.Retried, t.MaxRetry) } }, ) @@ -306,7 +309,27 @@ func listArchivedTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatLastFailedAt(t.LastFailedAt), t.LastErr) + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr) + } + }) +} + +func listCompletedTasks(qname string, pageNum, pageSize int) { + i := createInspector() + tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No completed tasks in %q queue\n", qname) + return + } + printTable( + []string{"ID", "Type", "Payload", "CompletedAt", "Result"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result)) } }) } @@ -356,7 +379,7 @@ func printTaskInfo(info *asynq.TaskInfo) { if len(info.LastErr) != 0 { fmt.Println() bold.Println("Last Failure") - fmt.Printf("Failed at: %s\n", formatLastFailedAt(info.LastFailedAt)) + fmt.Printf("Failed at: %s\n", formatPastTime(info.LastFailedAt)) fmt.Printf("Error message: %s\n", info.LastErr) } } @@ -371,11 +394,12 @@ func formatNextProcessAt(processAt time.Time) string { return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second)) } -func formatLastFailedAt(lastFailedAt time.Time) string { - if lastFailedAt.IsZero() || lastFailedAt.Unix() == 0 { +// formatPastTime takes t which is time in the past and returns a user-friendly string. +func formatPastTime(t time.Time) string { + if t.IsZero() || t.Unix() == 0 { return "" } - return lastFailedAt.Format(time.UnixDate) + return t.Format(time.UnixDate) } func taskArchive(cmd *cobra.Command, args []string) {