2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-24 10:36:12 +08:00

Format payload bytes in CLI output

This commit is contained in:
Ken Hibino
2021-06-02 06:37:46 -07:00
parent bf5e6b2190
commit 5b58444821
3 changed files with 44 additions and 9 deletions

View File

@@ -63,7 +63,7 @@ func cronList(cmd *cobra.Command, args []string) {
cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"} cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"}
printRows := func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, e := range entries { for _, e := range entries {
fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type, e.Task.Payload, e.Opts, fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), formatPayload(e.Task.Payload()), e.Opts,
nextEnqueue(e.Next), prevEnqueue(e.Prev)) nextEnqueue(e.Next), prevEnqueue(e.Prev))
} }
} }

View File

@@ -11,6 +11,8 @@ import (
"os" "os"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"unicode"
"unicode/utf8"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
@@ -195,3 +197,28 @@ func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
printRows(tw, format) printRows(tw, format)
tw.Flush() tw.Flush()
} }
// formatPayload returns string representation of payload if data is printable.
// If data is not printable, it returns a string describing payload is not printable.
func formatPayload(payload []byte) string {
if !isPrintable(payload) {
return "non-printable bytes"
}
return string(payload)
}
func isPrintable(data []byte) bool {
if !utf8.Valid(data) {
return false
}
isAllSpace := true
for _, r := range string(data) {
if !unicode.IsPrint(r) {
return false
}
if !unicode.IsSpace(r) {
isAllSpace = false
}
}
return !isAllSpace
}

View File

@@ -210,7 +210,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()))
} }
}, },
) )
@@ -231,7 +231,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()))
} }
}, },
) )
@@ -252,7 +252,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload", "Process In"}, []string{"ID", "Type", "Payload", "Process In"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt())) fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatProcessAt(t.NextProcessAt()))
} }
}, },
) )
@@ -281,10 +281,11 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
return return
} }
printTable( printTable(
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), formatProcessAt(t.NextProcessAt()), t.LastErr(), t.Retried(), t.MaxRetry()) fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatProcessAt(t.NextProcessAt()),
t.LastErr(), formatLastFailedAt(t.LastFailedAt()), t.Retried(), t.MaxRetry())
} }
}, },
) )
@@ -305,7 +306,7 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, []string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), t.LastFailedAt(), t.LastErr()) fmt.Fprintf(w, tmpl, t.ID(), t.Type(), formatPayload(t.Payload()), formatLastFailedAt(t.LastFailedAt()), t.LastErr())
} }
}) })
} }
@@ -355,13 +356,13 @@ func printTaskInfo(info *asynq.TaskInfo) {
if len(info.LastErr()) != 0 { if len(info.LastErr()) != 0 {
fmt.Println() fmt.Println()
bold.Println("Last Failure") bold.Println("Last Failure")
fmt.Printf("Failed at: %s\n", info.LastFailedAt().Format(time.UnixDate)) fmt.Printf("Failed at: %s\n", formatLastFailedAt(info.LastFailedAt()))
fmt.Printf("Error message: %s\n", info.LastErr()) fmt.Printf("Error message: %s\n", info.LastErr())
} }
} }
func formatNextProcessAt(processAt time.Time) string { func formatNextProcessAt(processAt time.Time) string {
if processAt.IsZero() { if processAt.IsZero() || processAt.Unix() == 0 {
return "n/a" return "n/a"
} }
if processAt.Before(time.Now()) { if processAt.Before(time.Now()) {
@@ -370,6 +371,13 @@ func formatNextProcessAt(processAt time.Time) string {
return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second)) return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second))
} }
func formatLastFailedAt(lastFailedAt time.Time) string {
if lastFailedAt.IsZero() || lastFailedAt.Unix() == 0 {
return ""
}
return lastFailedAt.Format(time.UnixDate)
}
func taskArchive(cmd *cobra.Command, args []string) { func taskArchive(cmd *cobra.Command, args []string) {
qname, err := cmd.Flags().GetString("queue") qname, err := cmd.Flags().GetString("queue")
if err != nil { if err != nil {