diff --git a/tools/asynq/cmd/cron.go b/tools/asynq/cmd/cron.go index a7cffd1..cb133d3 100644 --- a/tools/asynq/cmd/cron.go +++ b/tools/asynq/cmd/cron.go @@ -63,7 +63,7 @@ func cronList(cmd *cobra.Command, args []string) { cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"} printRows := func(w io.Writer, tmpl string) { for _, e := range entries { - fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type, e.Task.Payload, e.Opts, + fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), formatPayload(e.Task.Payload()), e.Opts, nextEnqueue(e.Next), prevEnqueue(e.Prev)) } } diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index ddd9262..109283f 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -11,6 +11,8 @@ import ( "os" "strings" "text/tabwriter" + "unicode" + "unicode/utf8" "github.com/go-redis/redis/v7" "github.com/hibiken/asynq" @@ -195,3 +197,28 @@ func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { printRows(tw, format) tw.Flush() } + +// formatPayload returns string representation of payload if data is printable. +// If data is not printable, it returns a string describing payload is not printable. +func formatPayload(payload []byte) string { + if !isPrintable(payload) { + return "non-printable bytes" + } + return string(payload) +} + +func isPrintable(data []byte) bool { + if !utf8.Valid(data) { + return false + } + isAllSpace := true + for _, r := range string(data) { + if !unicode.IsPrint(r) { + return false + } + if !unicode.IsSpace(r) { + isAllSpace = false + } + } + return !isAllSpace +} diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index ffb88bc..494351e 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -210,7 +210,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload())) } }, ) @@ -231,7 +231,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload())) } }, ) @@ -252,7 +252,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Process In"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt())) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatProcessAt(t.NextProcessAt())) } }, ) @@ -281,10 +281,11 @@ func listRetryTasks(qname string, pageNum, pageSize int) { return } printTable( - []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, + []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt()), t.LastErr(), t.Retried(), t.MaxRetry()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatProcessAt(t.NextProcessAt()), + t.LastErr(), formatLastFailedAt(t.LastFailedAt()), t.Retried(), t.MaxRetry()) } }, ) @@ -305,7 +306,7 @@ func listArchivedTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), t.LastFailedAt(), t.LastErr()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatLastFailedAt(t.LastFailedAt()), t.LastErr()) } }) } @@ -355,13 +356,13 @@ func printTaskInfo(info *asynq.TaskInfo) { if len(info.LastErr()) != 0 { fmt.Println() bold.Println("Last Failure") - fmt.Printf("Failed at: %s\n", info.LastFailedAt().Format(time.UnixDate)) + fmt.Printf("Failed at: %s\n", formatLastFailedAt(info.LastFailedAt())) fmt.Printf("Error message: %s\n", info.LastErr()) } } func formatNextProcessAt(processAt time.Time) string { - if processAt.IsZero() { + if processAt.IsZero() || processAt.Unix() == 0 { return "n/a" } if processAt.Before(time.Now()) { @@ -370,6 +371,13 @@ func formatNextProcessAt(processAt time.Time) string { return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second)) } +func formatLastFailedAt(lastFailedAt time.Time) string { + if lastFailedAt.IsZero() || lastFailedAt.Unix() == 0 { + return "" + } + return lastFailedAt.Format(time.UnixDate) +} + func taskArchive(cmd *cobra.Command, args []string) { qname, err := cmd.Flags().GetString("queue") if err != nil {