mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 13:21:58 +08:00
(cli): update queue details view
This commit is contained in:
@@ -19,7 +19,7 @@ func init() {
|
||||
rootCmd.AddCommand(dashCmd)
|
||||
// TODO: Remove this debug once we're done
|
||||
dashCmd.Flags().BoolVar(&flagDebug, "debug", false, "Print debug info")
|
||||
dashCmd.Flags().BoolVar(&flagUseRealData, "realdata", false, "Use real data in redis")
|
||||
dashCmd.Flags().BoolVar(&flagUseRealData, "realdata", true, "Use real data in redis")
|
||||
}
|
||||
|
||||
var dashCmd = &cobra.Command{
|
||||
|
@@ -28,11 +28,13 @@ const (
|
||||
// State holds dashboard state.
|
||||
type State struct {
|
||||
queues []*asynq.QueueInfo
|
||||
tasks []*asynq.TaskInfo
|
||||
redisInfo redisInfo
|
||||
err error
|
||||
|
||||
rowIdx int // highlighted row
|
||||
taskState asynq.TaskState // highlighted task state in queue details view
|
||||
queueTableRowIdx int // highlighted row in queue table
|
||||
taskTableRowIdx int // highlighted row in task table
|
||||
taskState asynq.TaskState // highlighted task state in queue details view
|
||||
|
||||
selectedQueue *asynq.QueueInfo // queue shown on queue details view
|
||||
|
||||
@@ -73,6 +75,7 @@ func Run(opts Options) {
|
||||
var (
|
||||
errorCh = make(chan error)
|
||||
queuesCh = make(chan []*asynq.QueueInfo)
|
||||
tasksCh = make(chan []*asynq.TaskInfo)
|
||||
redisInfoCh = make(chan *redisInfo)
|
||||
)
|
||||
|
||||
@@ -123,25 +126,41 @@ func Run(opts Options) {
|
||||
quit()
|
||||
} else if ev.Key() == tcell.KeyCtrlL {
|
||||
s.Sync()
|
||||
} else if ev.Key() == tcell.KeyDown || ev.Rune() == 'j' {
|
||||
if state.rowIdx < len(state.queues) {
|
||||
state.rowIdx++
|
||||
} else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueues {
|
||||
if state.queueTableRowIdx < len(state.queues) {
|
||||
state.queueTableRowIdx++
|
||||
} else {
|
||||
state.rowIdx = 0 // loop back
|
||||
state.queueTableRowIdx = 0 // loop back
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if ev.Key() == tcell.KeyUp || ev.Rune() == 'k' {
|
||||
if state.rowIdx == 0 {
|
||||
state.rowIdx = len(state.queues)
|
||||
} else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueues {
|
||||
if state.queueTableRowIdx == 0 {
|
||||
state.queueTableRowIdx = len(state.queues)
|
||||
} else {
|
||||
state.rowIdx--
|
||||
state.queueTableRowIdx--
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueueDetails {
|
||||
if state.taskTableRowIdx < len(state.tasks) {
|
||||
state.taskTableRowIdx++
|
||||
} else {
|
||||
state.taskTableRowIdx = 0 // loop back
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueueDetails {
|
||||
if state.taskTableRowIdx == 0 {
|
||||
state.taskTableRowIdx = len(state.tasks)
|
||||
} else {
|
||||
state.taskTableRowIdx--
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if ev.Key() == tcell.KeyEnter {
|
||||
if state.view == viewTypeQueues && state.rowIdx != 0 {
|
||||
state.selectedQueue = state.queues[state.rowIdx-1]
|
||||
if state.view == viewTypeQueues && state.queueTableRowIdx != 0 {
|
||||
state.selectedQueue = state.queues[state.queueTableRowIdx-1]
|
||||
state.view = viewTypeQueueDetails
|
||||
state.taskState = asynq.TaskStateActive
|
||||
state.tasks = nil
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
} else if ev.Rune() == '?' {
|
||||
@@ -168,9 +187,13 @@ func Run(opts Options) {
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyRight || ev.Rune() == 'l') && state.view == viewTypeQueueDetails {
|
||||
state.taskState = nextTaskState(state.taskState)
|
||||
state.tasks = nil
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails {
|
||||
state.taskState = prevTaskState(state.taskState)
|
||||
state.tasks = nil
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
}
|
||||
@@ -188,6 +211,11 @@ func Run(opts Options) {
|
||||
state.err = nil
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
|
||||
case tasks := <-tasksCh:
|
||||
state.tasks = tasks
|
||||
state.err = nil
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
|
||||
case redisInfo := <-redisInfoCh:
|
||||
state.redisInfo = *redisInfo
|
||||
state.err = nil
|
||||
|
@@ -9,6 +9,7 @@ import (
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gdamore/tcell/v2"
|
||||
"github.com/hibiken/asynq"
|
||||
@@ -27,12 +28,16 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) {
|
||||
d.NL()
|
||||
drawQueueTable(d, style, state)
|
||||
case viewTypeQueueDetails:
|
||||
d.Println(fmt.Sprintf("=== Queues > %s ===", state.selectedQueue.Queue), style.Bold(true))
|
||||
d.Println("=== Queue Summary ===", style.Bold(true))
|
||||
d.NL()
|
||||
drawQueueInfoBanner(d, style, state)
|
||||
drawQueueSummary(d, style, state)
|
||||
d.NL()
|
||||
d.NL()
|
||||
d.Println("=== Tasks ===", style.Bold(true))
|
||||
d.NL()
|
||||
d.Println("+++ Tasks +++", style.Bold(true))
|
||||
drawTaskStateBreakdown(d, style, state)
|
||||
d.NL()
|
||||
drawTaskTable(d, style, state)
|
||||
case viewTypeServers:
|
||||
d.Println("=== Servers ===", style.Bold(true))
|
||||
d.NL()
|
||||
@@ -54,7 +59,7 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) {
|
||||
// TODO: Draw HELP body
|
||||
}
|
||||
if opts.DebugMode {
|
||||
d.Println(fmt.Sprintf("DEBUG: rowIdx = %d", state.rowIdx), style)
|
||||
d.Println(fmt.Sprintf("DEBUG: rowIdx = %d", state.queueTableRowIdx), style)
|
||||
d.Println(fmt.Sprintf("DEBUG: selectedQueue = %s", state.selectedQueue.Queue), style)
|
||||
d.Println(fmt.Sprintf("DEBUG: view = %v", state.view), style)
|
||||
}
|
||||
@@ -215,19 +220,53 @@ var queueColumnConfigs = []*columnConfig[*asynq.QueueInfo]{
|
||||
}
|
||||
}},
|
||||
{"Size", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Size) }},
|
||||
{"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.String() }},
|
||||
{"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.Round(time.Second).String() }},
|
||||
{"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return ByteCount(q.MemoryUsage) }},
|
||||
{"Processed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Processed) }},
|
||||
{"Failed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Failed) }},
|
||||
{"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return "0.23%" /* TODO: implement this */ }},
|
||||
{"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return formatErrorRate(q.Processed, q.Failed) }},
|
||||
}
|
||||
|
||||
func formatErrorRate(processed, failed int) string {
|
||||
if processed == 0 {
|
||||
return "-"
|
||||
}
|
||||
return fmt.Sprintf("%.2f", float64(failed)/float64(processed))
|
||||
}
|
||||
|
||||
func drawQueueTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
drawTable(d, style, queueColumnConfigs, state.queues, state.rowIdx-1)
|
||||
drawTable(d, style, queueColumnConfigs, state.queues, state.queueTableRowIdx-1)
|
||||
}
|
||||
|
||||
func drawQueueInfoBanner(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
drawTable(d, style, queueColumnConfigs, []*asynq.QueueInfo{state.selectedQueue}, -1 /* no highlited row */)
|
||||
func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
q := state.selectedQueue
|
||||
labelStyle := style.Foreground(tcell.ColorLightGray)
|
||||
d.Print("Name: ", labelStyle)
|
||||
d.Println(q.Queue, style)
|
||||
d.Print("Size: ", labelStyle)
|
||||
d.Println(strconv.Itoa(q.Size), style)
|
||||
d.Print("Latency ", labelStyle)
|
||||
d.Println(q.Latency.Round(time.Second).String(), style)
|
||||
d.Print("MemUsage ", labelStyle)
|
||||
d.Println(ByteCount(q.MemoryUsage), style)
|
||||
}
|
||||
|
||||
func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
if state.taskState == asynq.TaskStateAggregating {
|
||||
d.Println("TODO: aggregating tasks need group name", style)
|
||||
return
|
||||
}
|
||||
if len(state.tasks) == 0 {
|
||||
return // print nothing
|
||||
}
|
||||
colConfigs := []*columnConfig[*asynq.TaskInfo]{
|
||||
{"ID", alignLeft, func(t *asynq.TaskInfo) string { return t.ID }},
|
||||
{"Type", alignLeft, func(t *asynq.TaskInfo) string { return t.Type }},
|
||||
{"Payload", alignLeft, func(t *asynq.TaskInfo) string { return string(t.Payload) }},
|
||||
{"MaxRetry", alignRight, func(t *asynq.TaskInfo) string { return strconv.Itoa(t.MaxRetry) }},
|
||||
{"LastError", alignLeft, func(t *asynq.TaskInfo) string { return t.LastErr }},
|
||||
}
|
||||
drawTable(d, style, colConfigs, state.tasks, state.taskTableRowIdx-1)
|
||||
}
|
||||
|
||||
// Define the order of states to show
|
||||
@@ -292,9 +331,10 @@ func drawTaskStateBreakdown(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
for _, ts := range taskStates {
|
||||
s := style
|
||||
if state.taskState == ts {
|
||||
s = s.Background(tcell.ColorDarkOliveGreen)
|
||||
s = s.Bold(true).Underline(true)
|
||||
}
|
||||
d.Print(fmt.Sprintf("%s:%d", ts.String(), getTaskCount(state.selectedQueue, ts)), s)
|
||||
d.Print(fmt.Sprintf("%s:%d", strings.Title(ts.String()), getTaskCount(state.selectedQueue, ts)), s)
|
||||
d.Print(pad, style)
|
||||
}
|
||||
d.NL()
|
||||
}
|
||||
|
@@ -47,3 +47,29 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
||||
peakMemoryUsage: n + 123,
|
||||
}
|
||||
}
|
||||
|
||||
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
|
||||
var (
|
||||
tasks []*asynq.TaskInfo
|
||||
err error
|
||||
)
|
||||
switch taskState {
|
||||
case asynq.TaskStateActive:
|
||||
tasks, err = i.ListActiveTasks(qname)
|
||||
case asynq.TaskStatePending:
|
||||
tasks, err = i.ListPendingTasks(qname)
|
||||
case asynq.TaskStateScheduled:
|
||||
tasks, err = i.ListScheduledTasks(qname)
|
||||
case asynq.TaskStateRetry:
|
||||
tasks, err = i.ListRetryTasks(qname)
|
||||
case asynq.TaskStateArchived:
|
||||
tasks, err = i.ListArchivedTasks(qname)
|
||||
case asynq.TaskStateCompleted:
|
||||
tasks, err = i.ListCompletedTasks(qname)
|
||||
}
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
tasksCh <- tasks
|
||||
}
|
||||
|
@@ -27,9 +27,10 @@ type column[V any] struct {
|
||||
width int
|
||||
}
|
||||
|
||||
|
||||
// Helper to draw a table.
|
||||
func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfig[V], data []V, highlightRowIdx int) {
|
||||
const colBuffer = 4 // extra buffer between columns
|
||||
const colBuffer = " " // extra buffer between columns
|
||||
cols := make([]*column[V], len(configs))
|
||||
for i, cfg := range configs {
|
||||
cols[i] = &column[V]{cfg, runewidth.StringWidth(cfg.name)}
|
||||
@@ -46,9 +47,9 @@ func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfi
|
||||
headerStyle := style.Background(tcell.ColorDimGray).Foreground(tcell.ColorWhite)
|
||||
for _, col := range cols {
|
||||
if col.alignment == alignLeft {
|
||||
d.Print(rpad(col.name, col.width+colBuffer), headerStyle)
|
||||
d.Print(rpad(col.name, col.width) + colBuffer, headerStyle)
|
||||
} else {
|
||||
d.Print(lpad(col.name, col.width+colBuffer), headerStyle)
|
||||
d.Print(lpad(col.name, col.width) + colBuffer, headerStyle)
|
||||
}
|
||||
}
|
||||
d.FillLine(' ', headerStyle)
|
||||
@@ -60,9 +61,9 @@ func drawTable[V any](d *ScreenDrawer, style tcell.Style, configs []*columnConfi
|
||||
}
|
||||
for _, col := range cols {
|
||||
if col.alignment == alignLeft {
|
||||
d.Print(rpad(col.displayFn(v), col.width+colBuffer), rowStyle)
|
||||
d.Print(rpad(col.displayFn(v), col.width) + colBuffer, rowStyle)
|
||||
} else {
|
||||
d.Print(lpad(col.displayFn(v), col.width+colBuffer), rowStyle)
|
||||
d.Print(lpad(col.displayFn(v), col.width) + colBuffer, rowStyle)
|
||||
}
|
||||
}
|
||||
d.FillLine(' ', rowStyle)
|
||||
|
Reference in New Issue
Block a user