2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 09:10:19 +08:00

(cli): Update task ls command to support completed tasks

This commit is contained in:
Ken Hibino 2021-09-29 06:14:27 -07:00
parent 8505ad4c87
commit 5d4f367a9a
3 changed files with 38 additions and 14 deletions

View File

@ -63,7 +63,7 @@ func cronList(cmd *cobra.Command, args []string) {
cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"} cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"}
printRows := func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, e := range entries { for _, e := range entries {
fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), formatPayload(e.Task.Payload()), e.Opts, fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Task.Type(), sprintBytes(e.Task.Payload()), e.Opts,
nextEnqueue(e.Next), prevEnqueue(e.Prev)) nextEnqueue(e.Next), prevEnqueue(e.Prev))
} }
} }

View File

@ -199,9 +199,9 @@ func printTable(cols []string, printRows func(w io.Writer, tmpl string)) {
tw.Flush() tw.Flush()
} }
// formatPayload returns string representation of payload if data is printable. // sprintBytes returns a string representation of the given byte slice if data is printable.
// If data is not printable, it returns a string describing payload is not printable. // If data is not printable, it returns a string describing it is not printable.
func formatPayload(payload []byte) string { func sprintBytes(payload []byte) string {
if !isPrintable(payload) { if !isPrintable(payload) {
return "non-printable bytes" return "non-printable bytes"
} }

View File

@ -86,6 +86,7 @@ The value for the state flag should be one of:
- scheduled - scheduled
- retry - retry
- archived - archived
- completed
List opeartion paginates the result set. List opeartion paginates the result set.
By default, the command fetches the first 30 tasks. By default, the command fetches the first 30 tasks.
@ -189,6 +190,8 @@ func taskList(cmd *cobra.Command, args []string) {
listRetryTasks(qname, pageNum, pageSize) listRetryTasks(qname, pageNum, pageSize)
case "archived": case "archived":
listArchivedTasks(qname, pageNum, pageSize) listArchivedTasks(qname, pageNum, pageSize)
case "completed":
listCompletedTasks(qname, pageNum, pageSize)
default: default:
fmt.Printf("error: state=%q is not supported\n", state) fmt.Printf("error: state=%q is not supported\n", state)
os.Exit(1) os.Exit(1)
@ -210,7 +213,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload)) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload))
} }
}, },
) )
@ -231,7 +234,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload"}, []string{"ID", "Type", "Payload"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload)) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload))
} }
}, },
) )
@ -252,7 +255,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload", "Process In"}, []string{"ID", "Type", "Payload", "Process In"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatProcessAt(t.NextProcessAt)) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatProcessAt(t.NextProcessAt))
} }
}, },
) )
@ -284,8 +287,8 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"}, []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Last Failed", "Retried", "Max Retry"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatProcessAt(t.NextProcessAt), fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatProcessAt(t.NextProcessAt),
t.LastErr, formatLastFailedAt(t.LastFailedAt), t.Retried, t.MaxRetry) t.LastErr, formatPastTime(t.LastFailedAt), t.Retried, t.MaxRetry)
} }
}, },
) )
@ -306,7 +309,27 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, []string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
func(w io.Writer, tmpl string) { func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, formatPayload(t.Payload), formatLastFailedAt(t.LastFailedAt), t.LastErr) fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.LastFailedAt), t.LastErr)
}
})
}
func listCompletedTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No completed tasks in %q queue\n", qname)
return
}
printTable(
[]string{"ID", "Type", "Payload", "CompletedAt", "Result"},
func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, sprintBytes(t.Payload), formatPastTime(t.CompletedAt), sprintBytes(t.Result))
} }
}) })
} }
@ -356,7 +379,7 @@ func printTaskInfo(info *asynq.TaskInfo) {
if len(info.LastErr) != 0 { if len(info.LastErr) != 0 {
fmt.Println() fmt.Println()
bold.Println("Last Failure") bold.Println("Last Failure")
fmt.Printf("Failed at: %s\n", formatLastFailedAt(info.LastFailedAt)) fmt.Printf("Failed at: %s\n", formatPastTime(info.LastFailedAt))
fmt.Printf("Error message: %s\n", info.LastErr) fmt.Printf("Error message: %s\n", info.LastErr)
} }
} }
@ -371,11 +394,12 @@ func formatNextProcessAt(processAt time.Time) string {
return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second)) return fmt.Sprintf("%s (in %v)", processAt.Format(time.UnixDate), processAt.Sub(time.Now()).Round(time.Second))
} }
func formatLastFailedAt(lastFailedAt time.Time) string { // formatPastTime takes t which is time in the past and returns a user-friendly string.
if lastFailedAt.IsZero() || lastFailedAt.Unix() == 0 { func formatPastTime(t time.Time) string {
if t.IsZero() || t.Unix() == 0 {
return "" return ""
} }
return lastFailedAt.Format(time.UnixDate) return t.Format(time.UnixDate)
} }
func taskArchive(cmd *cobra.Command, args []string) { func taskArchive(cmd *cobra.Command, args []string) {