From 553891e837e0c3271b55b74012d422accf18f717 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 20 May 2022 17:18:20 -0700 Subject: [PATCH] (cli): Add group selection table --- tools/asynq/cmd/dash/dash.go | 144 ++++++++++++++++++++++++++-------- tools/asynq/cmd/dash/draw.go | 60 +++++++++++++- tools/asynq/cmd/dash/fetch.go | 19 +++++ 3 files changed, 187 insertions(+), 36 deletions(-) diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 05ffb5d..9e1c688 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -29,14 +29,17 @@ const ( type State struct { queues []*asynq.QueueInfo tasks []*asynq.TaskInfo + groups []*asynq.GroupInfo redisInfo redisInfo err error queueTableRowIdx int // highlighted row in queue table taskTableRowIdx int // highlighted row in task table + groupTableRowIdx int // highlighted row in group table taskState asynq.TaskState // highlighted task state in queue details view selectedQueue *asynq.QueueInfo // queue shown on queue details view + selectedGroup *asynq.GroupInfo pageNum int // pagination page number @@ -78,6 +81,7 @@ func Run(opts Options) { errorCh = make(chan error) queueCh = make(chan *asynq.QueueInfo) queuesCh = make(chan []*asynq.QueueInfo) + groupsCh = make(chan []*asynq.GroupInfo) tasksCh = make(chan []*asynq.TaskInfo) redisInfoCh = make(chan *redisInfo) ) @@ -144,30 +148,60 @@ func Run(opts Options) { } drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueueDetails { - if state.taskTableRowIdx < len(state.tasks) { - state.taskTableRowIdx++ + if shouldShowGroupTable(&state) { + if state.groupTableRowIdx < groupPageSize(s) { + state.groupTableRowIdx++ + } else { + state.groupTableRowIdx = 0 // loop back + } } else { - state.taskTableRowIdx = 0 // loop back + if state.taskTableRowIdx < len(state.tasks) { + state.taskTableRowIdx++ + } else { + state.taskTableRowIdx = 0 // loop back + } } drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueueDetails { - if state.taskTableRowIdx == 0 { - state.taskTableRowIdx = len(state.tasks) + if shouldShowGroupTable(&state) { + if state.groupTableRowIdx == 0 { + state.groupTableRowIdx = groupPageSize(s) + } else { + state.groupTableRowIdx-- + } } else { - state.taskTableRowIdx-- + if state.taskTableRowIdx == 0 { + state.taskTableRowIdx = len(state.tasks) + } else { + state.taskTableRowIdx-- + } } drawDash(s, baseStyle, &state, opts) } else if ev.Key() == tcell.KeyEnter { - if state.view == viewTypeQueues && state.queueTableRowIdx != 0 { - state.selectedQueue = state.queues[state.queueTableRowIdx-1] - state.view = viewTypeQueueDetails - state.taskState = asynq.TaskStateActive - state.tasks = nil - state.pageNum = 1 - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) - ticker.Reset(interval) - drawDash(s, baseStyle, &state, opts) + switch state.view { + case viewTypeQueues: + if state.queueTableRowIdx != 0 { + state.selectedQueue = state.queues[state.queueTableRowIdx-1] + state.view = viewTypeQueueDetails + state.taskState = asynq.TaskStateActive + state.tasks = nil + state.pageNum = 1 + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + drawDash(s, baseStyle, &state, opts) + } + case viewTypeQueueDetails: + if shouldShowGroupTable(&state) && state.groupTableRowIdx != 0 { + state.selectedGroup = state.groups[state.groupTableRowIdx-1] + state.tasks = nil + state.pageNum = 1 + go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + drawDash(s, baseStyle, &state, opts) + } + } } else if ev.Rune() == '?' { state.prevView = state.view @@ -196,8 +230,13 @@ func Run(opts Options) { state.pageNum = 1 state.taskTableRowIdx = 0 state.tasks = nil - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) + state.selectedGroup = nil + if shouldShowGroupTable(&state) { + go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh) + } else { + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + } ticker.Reset(interval) drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails { @@ -205,25 +244,50 @@ func Run(opts Options) { state.pageNum = 1 state.taskTableRowIdx = 0 state.tasks = nil - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) + state.selectedGroup = nil + if shouldShowGroupTable(&state) { + go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh) + } else { + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + } ticker.Reset(interval) drawDash(s, baseStyle, &state, opts) } else if ev.Rune() == 'n' && state.view == viewTypeQueueDetails { - pageSize := taskPageSize(s) - totalCount := getTaskCount(state.selectedQueue, state.taskState) - if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { - state.pageNum++ - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - pageSize, state.pageNum, tasksCh, errorCh) - ticker.Reset(interval) + if shouldShowGroupTable(&state) { + pageSize := groupPageSize(s) + total := len(state.groups) + start := (state.pageNum - 1) * pageSize + end := start + pageSize + if end <= total { + state.pageNum++ + drawDash(s, baseStyle, &state, opts) + } + } else { + pageSize := taskPageSize(s) + totalCount := getTaskCount(state.selectedQueue, state.taskState) + if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { + state.pageNum++ + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + pageSize, state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + } } } else if ev.Rune() == 'p' && state.view == viewTypeQueueDetails { - if state.pageNum > 1 { - state.pageNum-- - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) - ticker.Reset(interval) + if shouldShowGroupTable(&state) { + pageSize := groupPageSize(s) + start := (state.pageNum - 1) * pageSize + if start > 0 { + state.pageNum-- + drawDash(s, baseStyle, &state, opts) + } + } else { + if state.pageNum > 1 { + state.pageNum-- + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + } } } } @@ -234,8 +298,15 @@ func Run(opts Options) { go fetchQueues(inspector, queuesCh, errorCh, opts) case viewTypeQueueDetails: go fetchQueueInfo(inspector, state.selectedQueue.Queue, queueCh, errorCh) - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) + if shouldShowGroupTable(&state) { + go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh) + } else if state.taskState == asynq.TaskStateAggregating { + go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + } else { + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + } case viewTypeRedis: go fetchRedisInfo(redisInfoCh, errorCh) } @@ -250,6 +321,11 @@ func Run(opts Options) { state.err = nil drawDash(s, baseStyle, &state, opts) + case groups := <-groupsCh: + state.groups = groups + state.err = nil + drawDash(s, baseStyle, &state, opts) + case tasks := <-tasksCh: state.tasks = tasks state.err = nil diff --git a/tools/asynq/cmd/dash/draw.go b/tools/asynq/cmd/dash/draw.go index 007a10f..48acafe 100644 --- a/tools/asynq/cmd/dash/draw.go +++ b/tools/asynq/cmd/dash/draw.go @@ -256,20 +256,31 @@ func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) { d.Println(byteCount(q.MemoryUsage), style) } +// Returns the max number of groups that can be displayed. +func groupPageSize(s tcell.Screen) int { + _, h := s.Size() + return h - 16 // height - (# of rows used) +} + // Returns the number of tasks to fetch. func taskPageSize(s tcell.Screen) int { _, h := s.Size() return h - 15 // height - (# of rows used) } +func shouldShowGroupTable(state *State) bool { + return state.taskState == asynq.TaskStateAggregating && state.selectedGroup == nil +} + func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) { - if state.taskState == asynq.TaskStateAggregating { - d.Println("TODO: aggregating tasks need group name", style) + if shouldShowGroupTable(state) { + drawGroupTable(d, style, state) return } if len(state.tasks) == 0 { return // print nothing } + // TODO: colConfigs should be different for each state colConfigs := []*columnConfig[*asynq.TaskInfo]{ {"ID", alignLeft, func(t *asynq.TaskInfo) string { return t.ID }}, {"Type", alignLeft, func(t *asynq.TaskInfo) string { return t.Type }}, @@ -282,6 +293,10 @@ func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) { // Pagination pageSize := taskPageSize(d.Screen()) totalCount := getTaskCount(state.selectedQueue, state.taskState) + if state.taskState == asynq.TaskStateAggregating { + // aggregating tasks are scoped to each group when shown in the table. + totalCount = state.selectedGroup.Size + } if pageSize < totalCount { start := (state.pageNum-1)*pageSize + 1 end := start + len(state.tasks) - 1 @@ -303,6 +318,47 @@ func isNextTaskPageAvailable(s tcell.Screen, state *State) bool { return end < totalCount } +func drawGroupTable(d *ScreenDrawer, style tcell.Style, state *State) { + if len(state.groups) == 0 { + return // print nothing + } + d.Println("<<< Select group >>>", style) + colConfigs := []*columnConfig[*asynq.GroupInfo]{ + {"Name", alignLeft, func(g *asynq.GroupInfo) string { return g.Group }}, + {"Size", alignRight, func(g *asynq.GroupInfo) string { return strconv.Itoa(g.Size) }}, + } + // pagination + pageSize := groupPageSize(d.Screen()) + total := len(state.groups) + start := (state.pageNum - 1) * pageSize + end := min(start+pageSize, total) + drawTable(d, style, colConfigs, state.groups[start:end], state.groupTableRowIdx-1) + + footerStyle := style.Foreground(tcell.ColorLightGray) + if pageSize < total { + d.Print(fmt.Sprintf("Showing %d-%d out of %d", start+1, end, total), footerStyle) + if end < total { + d.Print(" n=NextPage", footerStyle) + } + if start > 0 { + d.Print(" p=PrevPage", footerStyle) + } + } + d.FillLine(' ', footerStyle) +} + +type number interface { + int | int64 | float64 +} + +// min returns the smaller of x and y. if x==y, returns x +func min[V number](x, y V) V { + if x > y { + return y + } + return x +} + // Define the order of states to show var taskStates = []asynq.TaskState{ asynq.TaskStateActive, diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index 115a71b..8be12c6 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -56,6 +56,25 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { } } +func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) { + groups, err := i.Groups(qname) + if err != nil { + errorCh <- err + return + } + groupsCh <- groups +} + +func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int, + tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { + tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + errorCh <- err + return + } + tasksCh <- tasks +} + func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { var (