mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
(cli): Add group selection table
This commit is contained in:
@@ -29,14 +29,17 @@ const (
|
||||
type State struct {
|
||||
queues []*asynq.QueueInfo
|
||||
tasks []*asynq.TaskInfo
|
||||
groups []*asynq.GroupInfo
|
||||
redisInfo redisInfo
|
||||
err error
|
||||
|
||||
queueTableRowIdx int // highlighted row in queue table
|
||||
taskTableRowIdx int // highlighted row in task table
|
||||
groupTableRowIdx int // highlighted row in group table
|
||||
taskState asynq.TaskState // highlighted task state in queue details view
|
||||
|
||||
selectedQueue *asynq.QueueInfo // queue shown on queue details view
|
||||
selectedGroup *asynq.GroupInfo
|
||||
|
||||
pageNum int // pagination page number
|
||||
|
||||
@@ -78,6 +81,7 @@ func Run(opts Options) {
|
||||
errorCh = make(chan error)
|
||||
queueCh = make(chan *asynq.QueueInfo)
|
||||
queuesCh = make(chan []*asynq.QueueInfo)
|
||||
groupsCh = make(chan []*asynq.GroupInfo)
|
||||
tasksCh = make(chan []*asynq.TaskInfo)
|
||||
redisInfoCh = make(chan *redisInfo)
|
||||
)
|
||||
@@ -144,21 +148,39 @@ func Run(opts Options) {
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyDown || ev.Rune() == 'j') && state.view == viewTypeQueueDetails {
|
||||
if shouldShowGroupTable(&state) {
|
||||
if state.groupTableRowIdx < groupPageSize(s) {
|
||||
state.groupTableRowIdx++
|
||||
} else {
|
||||
state.groupTableRowIdx = 0 // loop back
|
||||
}
|
||||
} else {
|
||||
if state.taskTableRowIdx < len(state.tasks) {
|
||||
state.taskTableRowIdx++
|
||||
} else {
|
||||
state.taskTableRowIdx = 0 // loop back
|
||||
}
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyUp || ev.Rune() == 'k') && state.view == viewTypeQueueDetails {
|
||||
if shouldShowGroupTable(&state) {
|
||||
if state.groupTableRowIdx == 0 {
|
||||
state.groupTableRowIdx = groupPageSize(s)
|
||||
} else {
|
||||
state.groupTableRowIdx--
|
||||
}
|
||||
} else {
|
||||
if state.taskTableRowIdx == 0 {
|
||||
state.taskTableRowIdx = len(state.tasks)
|
||||
} else {
|
||||
state.taskTableRowIdx--
|
||||
}
|
||||
}
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if ev.Key() == tcell.KeyEnter {
|
||||
if state.view == viewTypeQueues && state.queueTableRowIdx != 0 {
|
||||
switch state.view {
|
||||
case viewTypeQueues:
|
||||
if state.queueTableRowIdx != 0 {
|
||||
state.selectedQueue = state.queues[state.queueTableRowIdx-1]
|
||||
state.view = viewTypeQueueDetails
|
||||
state.taskState = asynq.TaskStateActive
|
||||
@@ -169,6 +191,18 @@ func Run(opts Options) {
|
||||
ticker.Reset(interval)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
case viewTypeQueueDetails:
|
||||
if shouldShowGroupTable(&state) && state.groupTableRowIdx != 0 {
|
||||
state.selectedGroup = state.groups[state.groupTableRowIdx-1]
|
||||
state.tasks = nil
|
||||
state.pageNum = 1
|
||||
go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
ticker.Reset(interval)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
|
||||
}
|
||||
} else if ev.Rune() == '?' {
|
||||
state.prevView = state.view
|
||||
state.view = viewTypeHelp
|
||||
@@ -196,8 +230,13 @@ func Run(opts Options) {
|
||||
state.pageNum = 1
|
||||
state.taskTableRowIdx = 0
|
||||
state.tasks = nil
|
||||
state.selectedGroup = nil
|
||||
if shouldShowGroupTable(&state) {
|
||||
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
|
||||
} else {
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
}
|
||||
ticker.Reset(interval)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails {
|
||||
@@ -205,11 +244,26 @@ func Run(opts Options) {
|
||||
state.pageNum = 1
|
||||
state.taskTableRowIdx = 0
|
||||
state.tasks = nil
|
||||
state.selectedGroup = nil
|
||||
if shouldShowGroupTable(&state) {
|
||||
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
|
||||
} else {
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
}
|
||||
ticker.Reset(interval)
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
} else if ev.Rune() == 'n' && state.view == viewTypeQueueDetails {
|
||||
if shouldShowGroupTable(&state) {
|
||||
pageSize := groupPageSize(s)
|
||||
total := len(state.groups)
|
||||
start := (state.pageNum - 1) * pageSize
|
||||
end := start + pageSize
|
||||
if end <= total {
|
||||
state.pageNum++
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
} else {
|
||||
pageSize := taskPageSize(s)
|
||||
totalCount := getTaskCount(state.selectedQueue, state.taskState)
|
||||
if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount {
|
||||
@@ -218,7 +272,16 @@ func Run(opts Options) {
|
||||
pageSize, state.pageNum, tasksCh, errorCh)
|
||||
ticker.Reset(interval)
|
||||
}
|
||||
}
|
||||
} else if ev.Rune() == 'p' && state.view == viewTypeQueueDetails {
|
||||
if shouldShowGroupTable(&state) {
|
||||
pageSize := groupPageSize(s)
|
||||
start := (state.pageNum - 1) * pageSize
|
||||
if start > 0 {
|
||||
state.pageNum--
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
}
|
||||
} else {
|
||||
if state.pageNum > 1 {
|
||||
state.pageNum--
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||
@@ -227,6 +290,7 @@ func Run(opts Options) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
switch state.view {
|
||||
@@ -234,8 +298,15 @@ func Run(opts Options) {
|
||||
go fetchQueues(inspector, queuesCh, errorCh, opts)
|
||||
case viewTypeQueueDetails:
|
||||
go fetchQueueInfo(inspector, state.selectedQueue.Queue, queueCh, errorCh)
|
||||
if shouldShowGroupTable(&state) {
|
||||
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
|
||||
} else if state.taskState == asynq.TaskStateAggregating {
|
||||
go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
} else {
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
}
|
||||
case viewTypeRedis:
|
||||
go fetchRedisInfo(redisInfoCh, errorCh)
|
||||
}
|
||||
@@ -250,6 +321,11 @@ func Run(opts Options) {
|
||||
state.err = nil
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
|
||||
case groups := <-groupsCh:
|
||||
state.groups = groups
|
||||
state.err = nil
|
||||
drawDash(s, baseStyle, &state, opts)
|
||||
|
||||
case tasks := <-tasksCh:
|
||||
state.tasks = tasks
|
||||
state.err = nil
|
||||
|
@@ -256,20 +256,31 @@ func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
d.Println(byteCount(q.MemoryUsage), style)
|
||||
}
|
||||
|
||||
// Returns the max number of groups that can be displayed.
|
||||
func groupPageSize(s tcell.Screen) int {
|
||||
_, h := s.Size()
|
||||
return h - 16 // height - (# of rows used)
|
||||
}
|
||||
|
||||
// Returns the number of tasks to fetch.
|
||||
func taskPageSize(s tcell.Screen) int {
|
||||
_, h := s.Size()
|
||||
return h - 15 // height - (# of rows used)
|
||||
}
|
||||
|
||||
func shouldShowGroupTable(state *State) bool {
|
||||
return state.taskState == asynq.TaskStateAggregating && state.selectedGroup == nil
|
||||
}
|
||||
|
||||
func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
if state.taskState == asynq.TaskStateAggregating {
|
||||
d.Println("TODO: aggregating tasks need group name", style)
|
||||
if shouldShowGroupTable(state) {
|
||||
drawGroupTable(d, style, state)
|
||||
return
|
||||
}
|
||||
if len(state.tasks) == 0 {
|
||||
return // print nothing
|
||||
}
|
||||
// TODO: colConfigs should be different for each state
|
||||
colConfigs := []*columnConfig[*asynq.TaskInfo]{
|
||||
{"ID", alignLeft, func(t *asynq.TaskInfo) string { return t.ID }},
|
||||
{"Type", alignLeft, func(t *asynq.TaskInfo) string { return t.Type }},
|
||||
@@ -282,6 +293,10 @@ func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
// Pagination
|
||||
pageSize := taskPageSize(d.Screen())
|
||||
totalCount := getTaskCount(state.selectedQueue, state.taskState)
|
||||
if state.taskState == asynq.TaskStateAggregating {
|
||||
// aggregating tasks are scoped to each group when shown in the table.
|
||||
totalCount = state.selectedGroup.Size
|
||||
}
|
||||
if pageSize < totalCount {
|
||||
start := (state.pageNum-1)*pageSize + 1
|
||||
end := start + len(state.tasks) - 1
|
||||
@@ -303,6 +318,47 @@ func isNextTaskPageAvailable(s tcell.Screen, state *State) bool {
|
||||
return end < totalCount
|
||||
}
|
||||
|
||||
func drawGroupTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||
if len(state.groups) == 0 {
|
||||
return // print nothing
|
||||
}
|
||||
d.Println("<<< Select group >>>", style)
|
||||
colConfigs := []*columnConfig[*asynq.GroupInfo]{
|
||||
{"Name", alignLeft, func(g *asynq.GroupInfo) string { return g.Group }},
|
||||
{"Size", alignRight, func(g *asynq.GroupInfo) string { return strconv.Itoa(g.Size) }},
|
||||
}
|
||||
// pagination
|
||||
pageSize := groupPageSize(d.Screen())
|
||||
total := len(state.groups)
|
||||
start := (state.pageNum - 1) * pageSize
|
||||
end := min(start+pageSize, total)
|
||||
drawTable(d, style, colConfigs, state.groups[start:end], state.groupTableRowIdx-1)
|
||||
|
||||
footerStyle := style.Foreground(tcell.ColorLightGray)
|
||||
if pageSize < total {
|
||||
d.Print(fmt.Sprintf("Showing %d-%d out of %d", start+1, end, total), footerStyle)
|
||||
if end < total {
|
||||
d.Print(" n=NextPage", footerStyle)
|
||||
}
|
||||
if start > 0 {
|
||||
d.Print(" p=PrevPage", footerStyle)
|
||||
}
|
||||
}
|
||||
d.FillLine(' ', footerStyle)
|
||||
}
|
||||
|
||||
type number interface {
|
||||
int | int64 | float64
|
||||
}
|
||||
|
||||
// min returns the smaller of x and y. if x==y, returns x
|
||||
func min[V number](x, y V) V {
|
||||
if x > y {
|
||||
return y
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
// Define the order of states to show
|
||||
var taskStates = []asynq.TaskState{
|
||||
asynq.TaskStateActive,
|
||||
|
@@ -56,6 +56,25 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
||||
}
|
||||
}
|
||||
|
||||
func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) {
|
||||
groups, err := i.Groups(qname)
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
groupsCh <- groups
|
||||
}
|
||||
|
||||
func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int,
|
||||
tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
|
||||
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
return
|
||||
}
|
||||
tasksCh <- tasks
|
||||
}
|
||||
|
||||
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int,
|
||||
tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
|
||||
var (
|
||||
|
Reference in New Issue
Block a user