diff --git a/tools/asynq/cmd/dash.go b/tools/asynq/cmd/dash.go index 9e2fcfb..92ecd77 100644 --- a/tools/asynq/cmd/dash.go +++ b/tools/asynq/cmd/dash.go @@ -19,7 +19,7 @@ func init() { rootCmd.AddCommand(dashCmd) // TODO: Remove this debug once we're done dashCmd.Flags().BoolVar(&flagDebug, "debug", false, "Print debug info") - dashCmd.Flags().BoolVar(&flagUseRealData, "realdata", false, "Use real data in redis") + dashCmd.Flags().BoolVar(&flagUseRealData, "realdata", true, "Use real data in redis") } var dashCmd = &cobra.Command{ diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 13d5d29..264846e 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -28,11 +28,13 @@ const ( // State holds dashboard state. type State struct { queues []*asynq.QueueInfo + tasks []*asynq.TaskInfo redisInfo redisInfo err error - rowIdx int // highlighted row - taskState asynq.TaskState // highlighted task state in queue details view + queueTableRowIdx int // highlighted row in queue table + taskTableRowIdx int // highlighted row in task table + taskState asynq.TaskState // highlighted task state in queue details view selectedQueue *asynq.QueueInfo // queue shown on queue details view @@ -73,6 +75,7 @@ func Run(opts Options) { var ( errorCh = make(chan error) queuesCh = make(chan []*asynq.QueueInfo) + tasksCh = make(chan []*asynq.TaskInfo) redisInfoCh = make(chan *redisInfo) ) @@ -123,25 +126,41 @@ func Run(opts Options) { quit() } else if ev.Key() == tcell.KeyCtrlL { s.Sync() - } else if ev.Key() == tcell.KeyDown || ev.Rune() == 'j' { - if state.rowIdx < len(state.queues) { - state.rowIdx++ + } else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueues { + if state.queueTableRowIdx < len(state.queues) { + state.queueTableRowIdx++ } else { - state.rowIdx = 0 // loop back + state.queueTableRowIdx = 0 // loop back } drawDash(s, baseStyle, &state, opts) - } else if ev.Key() == tcell.KeyUp || ev.Rune() == 'k' { - if state.rowIdx == 0 { - state.rowIdx = len(state.queues) + } else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueues { + if state.queueTableRowIdx == 0 { + state.queueTableRowIdx = len(state.queues) } else { - state.rowIdx-- + state.queueTableRowIdx-- + } + drawDash(s, baseStyle, &state, opts) + } else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueueDetails { + if state.taskTableRowIdx < len(state.tasks) { + state.taskTableRowIdx++ + } else { + state.taskTableRowIdx = 0 // loop back + } + drawDash(s, baseStyle, &state, opts) + } else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueueDetails { + if state.taskTableRowIdx == 0 { + state.taskTableRowIdx = len(state.tasks) + } else { + state.taskTableRowIdx-- } drawDash(s, baseStyle, &state, opts) } else if ev.Key() == tcell.KeyEnter { - if state.view == viewTypeQueues && state.rowIdx != 0 { - state.selectedQueue = state.queues[state.rowIdx-1] + if state.view == viewTypeQueues && state.queueTableRowIdx != 0 { + state.selectedQueue = state.queues[state.queueTableRowIdx-1] state.view = viewTypeQueueDetails state.taskState = asynq.TaskStateActive + state.tasks = nil + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) drawDash(s, baseStyle, &state, opts) } } else if ev.Rune() == '?' { @@ -168,9 +187,13 @@ func Run(opts Options) { drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyRight || ev.Rune() == 'l') && state.view == viewTypeQueueDetails { state.taskState = nextTaskState(state.taskState) + state.tasks = nil + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails { state.taskState = prevTaskState(state.taskState) + state.tasks = nil + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) drawDash(s, baseStyle, &state, opts) } } @@ -188,6 +211,11 @@ func Run(opts Options) { state.err = nil drawDash(s, baseStyle, &state, opts) + case tasks := <-tasksCh: + state.tasks = tasks + state.err = nil + drawDash(s, baseStyle, &state, opts) + case redisInfo := <-redisInfoCh: state.redisInfo = *redisInfo state.err = nil diff --git a/tools/asynq/cmd/dash/draw.go b/tools/asynq/cmd/dash/draw.go index edee874..e507d10 100644 --- a/tools/asynq/cmd/dash/draw.go +++ b/tools/asynq/cmd/dash/draw.go @@ -9,6 +9,7 @@ import ( "math" "strconv" "strings" + "time" "github.com/gdamore/tcell/v2" "github.com/hibiken/asynq" @@ -27,12 +28,16 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) { d.NL() drawQueueTable(d, style, state) case viewTypeQueueDetails: - d.Println(fmt.Sprintf("=== Queues > %s ===", state.selectedQueue.Queue), style.Bold(true)) + d.Println("=== Queue Summary ===", style.Bold(true)) d.NL() - drawQueueInfoBanner(d, style, state) + drawQueueSummary(d, style, state) + d.NL() + d.NL() + d.Println("=== Tasks ===", style.Bold(true)) d.NL() - d.Println("+++ Tasks +++", style.Bold(true)) drawTaskStateBreakdown(d, style, state) + d.NL() + drawTaskTable(d, style, state) case viewTypeServers: d.Println("=== Servers ===", style.Bold(true)) d.NL() @@ -54,7 +59,7 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) { // TODO: Draw HELP body } if opts.DebugMode { - d.Println(fmt.Sprintf("DEBUG: rowIdx = %d", state.rowIdx), style) + d.Println(fmt.Sprintf("DEBUG: rowIdx = %d", state.queueTableRowIdx), style) d.Println(fmt.Sprintf("DEBUG: selectedQueue = %s", state.selectedQueue.Queue), style) d.Println(fmt.Sprintf("DEBUG: view = %v", state.view), style) } @@ -215,19 +220,53 @@ var queueColumnConfigs = []*columnConfig[*asynq.QueueInfo]{ } }}, {"Size", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Size) }}, - {"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.String() }}, + {"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.Round(time.Second).String() }}, {"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return ByteCount(q.MemoryUsage) }}, {"Processed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Processed) }}, {"Failed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Failed) }}, - {"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return "0.23%" /* TODO: implement this */ }}, + {"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return formatErrorRate(q.Processed, q.Failed) }}, +} + +func formatErrorRate(processed, failed int) string { + if processed == 0 { + return "-" + } + return fmt.Sprintf("%.2f", float64(failed)/float64(processed)) } func drawQueueTable(d *ScreenDrawer, style tcell.Style, state *State) { - drawTable(d, style, queueColumnConfigs, state.queues, state.rowIdx-1) + drawTable(d, style, queueColumnConfigs, state.queues, state.queueTableRowIdx-1) } -func drawQueueInfoBanner(d *ScreenDrawer, style tcell.Style, state *State) { - drawTable(d, style, queueColumnConfigs, []*asynq.QueueInfo{state.selectedQueue}, -1 /* no highlited row */) +func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) { + q := state.selectedQueue + labelStyle := style.Foreground(tcell.ColorLightGray) + d.Print("Name: ", labelStyle) + d.Println(q.Queue, style) + d.Print("Size: ", labelStyle) + d.Println(strconv.Itoa(q.Size), style) + d.Print("Latency ", labelStyle) + d.Println(q.Latency.Round(time.Second).String(), style) + d.Print("MemUsage ", labelStyle) + d.Println(ByteCount(q.MemoryUsage), style) +} + +func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) { + if state.taskState == asynq.TaskStateAggregating { + d.Println("TODO: aggregating tasks need group name", style) + return + } + if len(state.tasks) == 0 { + return // print nothing + } + colConfigs := []*columnConfig[*asynq.TaskInfo]{ + {"ID", alignLeft, func(t *asynq.TaskInfo) string { return t.ID }}, + {"Type", alignLeft, func(t *asynq.TaskInfo) string { return t.Type }}, + {"Payload", alignLeft, func(t *asynq.TaskInfo) string { return string(t.Payload) }}, + {"MaxRetry", alignRight, func(t *asynq.TaskInfo) string { return strconv.Itoa(t.MaxRetry) }}, + {"LastError", alignLeft, func(t *asynq.TaskInfo) string { return t.LastErr }}, + } + drawTable(d, style, colConfigs, state.tasks, state.taskTableRowIdx-1) } // Define the order of states to show @@ -292,9 +331,10 @@ func drawTaskStateBreakdown(d *ScreenDrawer, style tcell.Style, state *State) { for _, ts := range taskStates { s := style if state.taskState == ts { - s = s.Background(tcell.ColorDarkOliveGreen) + s = s.Bold(true).Underline(true) } - d.Print(fmt.Sprintf("%s:%d", ts.String(), getTaskCount(state.selectedQueue, ts)), s) + d.Print(fmt.Sprintf("%s:%d", strings.Title(ts.String()), getTaskCount(state.selectedQueue, ts)), s) d.Print(pad, style) } + d.NL() } diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index e158d9d..aae7df3 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -47,3 +47,29 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { peakMemoryUsage: n + 123, } } + +func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { + var ( + tasks []*asynq.TaskInfo + err error + ) + switch taskState { + case asynq.TaskStateActive: + tasks, err = i.ListActiveTasks(qname) + case asynq.TaskStatePending: + tasks, err = i.ListPendingTasks(qname) + case asynq.TaskStateScheduled: + tasks, err = i.ListScheduledTasks(qname) + case asynq.TaskStateRetry: + tasks, err = i.ListRetryTasks(qname) + case asynq.TaskStateArchived: + tasks, err = i.ListArchivedTasks(qname) + case asynq.TaskStateCompleted: + tasks, err = i.ListCompletedTasks(qname) + } + if err != nil { + errorCh <- err + return + } + tasksCh <- tasks +} diff --git a/tools/asynq/cmd/dash/table.go b/tools/asynq/cmd/dash/table.go index 539607e..d3a9778 100644 --- a/tools/asynq/cmd/dash/table.go +++ b/tools/asynq/cmd/dash/table.go @@ -27,9 +27,10 @@ type column[V any] struct { width int } + // Helper to draw a table. func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfig[V], data []V, highlightRowIdx int) { - const colBuffer = 4 // extra buffer between columns + const colBuffer = " " // extra buffer between columns cols := make([]*column[V], len(configs)) for i, cfg := range configs { cols[i] = &column[V]{cfg, runewidth.StringWidth(cfg.name)} @@ -46,9 +47,9 @@ func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfi headerStyle := style.Background(tcell.ColorDimGray).Foreground(tcell.ColorWhite) for _, col := range cols { if col.alignment == alignLeft { - d.Print(rpad(col.name, col.width+colBuffer), headerStyle) + d.Print(rpad(col.name, col.width) + colBuffer, headerStyle) } else { - d.Print(lpad(col.name, col.width+colBuffer), headerStyle) + d.Print(lpad(col.name, col.width) + colBuffer, headerStyle) } } d.FillLine(' ', headerStyle) @@ -60,9 +61,9 @@ func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfi } for _, col := range cols { if col.alignment == alignLeft { - d.Print(rpad(col.displayFn(v), col.width+colBuffer), rowStyle) + d.Print(rpad(col.displayFn(v), col.width) + colBuffer, rowStyle) } else { - d.Print(lpad(col.displayFn(v), col.width+colBuffer), rowStyle) + d.Print(lpad(col.displayFn(v), col.width) + colBuffer, rowStyle) } } d.FillLine(' ', rowStyle)