mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 13:21:58 +08:00
(cli): Add pagination to task table
This commit is contained in:
@@ -38,6 +38,8 @@ type State struct {
|
|||||||
|
|
||||||
selectedQueue *asynq.QueueInfo // queue shown on queue details view
|
selectedQueue *asynq.QueueInfo // queue shown on queue details view
|
||||||
|
|
||||||
|
pageNum int // pagination page number
|
||||||
|
|
||||||
view viewType // current view type
|
view viewType // current view type
|
||||||
prevView viewType // to support "go back"
|
prevView viewType // to support "go back"
|
||||||
}
|
}
|
||||||
@@ -74,12 +76,13 @@ func Run(opts Options) {
|
|||||||
// channels to send/receive data fetched asynchronously
|
// channels to send/receive data fetched asynchronously
|
||||||
var (
|
var (
|
||||||
errorCh = make(chan error)
|
errorCh = make(chan error)
|
||||||
|
queueCh = make(chan *asynq.QueueInfo)
|
||||||
queuesCh = make(chan []*asynq.QueueInfo)
|
queuesCh = make(chan []*asynq.QueueInfo)
|
||||||
tasksCh = make(chan []*asynq.TaskInfo)
|
tasksCh = make(chan []*asynq.TaskInfo)
|
||||||
redisInfoCh = make(chan *redisInfo)
|
redisInfoCh = make(chan *redisInfo)
|
||||||
)
|
)
|
||||||
|
|
||||||
go fetchQueueInfo(inspector, queuesCh, errorCh, opts)
|
go fetchQueues(inspector, queuesCh, errorCh, opts)
|
||||||
|
|
||||||
var state State // contained in this goroutine only; do not share
|
var state State // contained in this goroutine only; do not share
|
||||||
|
|
||||||
@@ -160,7 +163,10 @@ func Run(opts Options) {
|
|||||||
state.view = viewTypeQueueDetails
|
state.view = viewTypeQueueDetails
|
||||||
state.taskState = asynq.TaskStateActive
|
state.taskState = asynq.TaskStateActive
|
||||||
state.tasks = nil
|
state.tasks = nil
|
||||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
state.pageNum = 1
|
||||||
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
}
|
}
|
||||||
} else if ev.Rune() == '?' {
|
} else if ev.Rune() == '?' {
|
||||||
@@ -168,7 +174,7 @@ func Run(opts Options) {
|
|||||||
state.view = viewTypeHelp
|
state.view = viewTypeHelp
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
} else if ev.Key() == tcell.KeyF1 && state.view != viewTypeQueues {
|
} else if ev.Key() == tcell.KeyF1 && state.view != viewTypeQueues {
|
||||||
go fetchQueueInfo(inspector, queuesCh, errorCh, opts)
|
go fetchQueues(inspector, queuesCh, errorCh, opts)
|
||||||
ticker.Reset(interval)
|
ticker.Reset(interval)
|
||||||
state.view = viewTypeQueues
|
state.view = viewTypeQueues
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
@@ -187,21 +193,49 @@ func Run(opts Options) {
|
|||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
} else if (ev.Key() == tcell.KeyRight || ev.Rune() == 'l') && state.view == viewTypeQueueDetails {
|
} else if (ev.Key() == tcell.KeyRight || ev.Rune() == 'l') && state.view == viewTypeQueueDetails {
|
||||||
state.taskState = nextTaskState(state.taskState)
|
state.taskState = nextTaskState(state.taskState)
|
||||||
|
state.pageNum = 1
|
||||||
|
state.taskTableRowIdx = 0
|
||||||
state.tasks = nil
|
state.tasks = nil
|
||||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
} else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails {
|
} else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails {
|
||||||
state.taskState = prevTaskState(state.taskState)
|
state.taskState = prevTaskState(state.taskState)
|
||||||
|
state.pageNum = 1
|
||||||
|
state.taskTableRowIdx = 0
|
||||||
state.tasks = nil
|
state.tasks = nil
|
||||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh)
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
|
} else if ev.Rune() == 'n' && state.view == viewTypeQueueDetails {
|
||||||
|
pageSize := taskPageSize(s)
|
||||||
|
totalCount := getTaskCount(state.selectedQueue, state.taskState)
|
||||||
|
if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount {
|
||||||
|
state.pageNum++
|
||||||
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
pageSize, state.pageNum, tasksCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
|
}
|
||||||
|
} else if ev.Rune() == 'p' && state.view == viewTypeQueueDetails {
|
||||||
|
if state.pageNum > 1 {
|
||||||
|
state.pageNum--
|
||||||
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
switch state.view {
|
switch state.view {
|
||||||
case viewTypeQueues:
|
case viewTypeQueues:
|
||||||
go fetchQueueInfo(inspector, queuesCh, errorCh, opts)
|
go fetchQueues(inspector, queuesCh, errorCh, opts)
|
||||||
|
case viewTypeQueueDetails:
|
||||||
|
go fetchQueueInfo(inspector, state.selectedQueue.Queue, queueCh, errorCh)
|
||||||
|
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||||
|
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||||
case viewTypeRedis:
|
case viewTypeRedis:
|
||||||
go fetchRedisInfo(redisInfoCh, errorCh)
|
go fetchRedisInfo(redisInfoCh, errorCh)
|
||||||
}
|
}
|
||||||
@@ -211,6 +245,11 @@ func Run(opts Options) {
|
|||||||
state.err = nil
|
state.err = nil
|
||||||
drawDash(s, baseStyle, &state, opts)
|
drawDash(s, baseStyle, &state, opts)
|
||||||
|
|
||||||
|
case q := <-queueCh:
|
||||||
|
state.selectedQueue = q
|
||||||
|
state.err = nil
|
||||||
|
drawDash(s, baseStyle, &state, opts)
|
||||||
|
|
||||||
case tasks := <-tasksCh:
|
case tasks := <-tasksCh:
|
||||||
state.tasks = tasks
|
state.tasks = tasks
|
||||||
state.err = nil
|
state.err = nil
|
||||||
|
@@ -51,8 +51,8 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) {
|
|||||||
d.NL()
|
d.NL()
|
||||||
d.Println(fmt.Sprintf("Version: %s", state.redisInfo.version), style)
|
d.Println(fmt.Sprintf("Version: %s", state.redisInfo.version), style)
|
||||||
d.Println(fmt.Sprintf("Uptime: %s", state.redisInfo.uptime), style)
|
d.Println(fmt.Sprintf("Uptime: %s", state.redisInfo.uptime), style)
|
||||||
d.Println(fmt.Sprintf("Memory Usage: %s", ByteCount(int64(state.redisInfo.memoryUsage))), style)
|
d.Println(fmt.Sprintf("Memory Usage: %s", byteCount(int64(state.redisInfo.memoryUsage))), style)
|
||||||
d.Println(fmt.Sprintf("Peak Memory Usage: %s", ByteCount(int64(state.redisInfo.peakMemoryUsage))), style)
|
d.Println(fmt.Sprintf("Peak Memory Usage: %s", byteCount(int64(state.redisInfo.peakMemoryUsage))), style)
|
||||||
case viewTypeHelp:
|
case viewTypeHelp:
|
||||||
d.Println("=== HELP ===", style.Bold(true))
|
d.Println("=== HELP ===", style.Bold(true))
|
||||||
d.NL()
|
d.NL()
|
||||||
@@ -194,8 +194,8 @@ func lpad(s string, padding int) string {
|
|||||||
return fmt.Sprintf(tmpl, s)
|
return fmt.Sprintf(tmpl, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ByteCount converts the given bytes into human readable string
|
// byteCount converts the given bytes into human readable string
|
||||||
func ByteCount(b int64) string {
|
func byteCount(b int64) string {
|
||||||
const unit = 1000
|
const unit = 1000
|
||||||
if b < unit {
|
if b < unit {
|
||||||
return fmt.Sprintf("%d B", b)
|
return fmt.Sprintf("%d B", b)
|
||||||
@@ -212,21 +212,22 @@ func ByteCount(b int64) string {
|
|||||||
|
|
||||||
var queueColumnConfigs = []*columnConfig[*asynq.QueueInfo]{
|
var queueColumnConfigs = []*columnConfig[*asynq.QueueInfo]{
|
||||||
{"Queue", alignLeft, func(q *asynq.QueueInfo) string { return q.Queue }},
|
{"Queue", alignLeft, func(q *asynq.QueueInfo) string { return q.Queue }},
|
||||||
{"State", alignLeft, func(q *asynq.QueueInfo) string {
|
{"State", alignLeft, func(q *asynq.QueueInfo) string { return formatQueueState(q) }},
|
||||||
if q.Paused {
|
|
||||||
return "PAUSED"
|
|
||||||
} else {
|
|
||||||
return "RUN"
|
|
||||||
}
|
|
||||||
}},
|
|
||||||
{"Size", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Size) }},
|
{"Size", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Size) }},
|
||||||
{"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.Round(time.Second).String() }},
|
{"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.Round(time.Second).String() }},
|
||||||
{"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return ByteCount(q.MemoryUsage) }},
|
{"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return byteCount(q.MemoryUsage) }},
|
||||||
{"Processed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Processed) }},
|
{"Processed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Processed) }},
|
||||||
{"Failed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Failed) }},
|
{"Failed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Failed) }},
|
||||||
{"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return formatErrorRate(q.Processed, q.Failed) }},
|
{"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return formatErrorRate(q.Processed, q.Failed) }},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func formatQueueState(q *asynq.QueueInfo) string {
|
||||||
|
if q.Paused {
|
||||||
|
return "PAUSED"
|
||||||
|
}
|
||||||
|
return "RUN"
|
||||||
|
}
|
||||||
|
|
||||||
func formatErrorRate(processed, failed int) string {
|
func formatErrorRate(processed, failed int) string {
|
||||||
if processed == 0 {
|
if processed == 0 {
|
||||||
return "-"
|
return "-"
|
||||||
@@ -240,6 +241,10 @@ func drawQueueTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
|||||||
|
|
||||||
func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) {
|
func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||||
q := state.selectedQueue
|
q := state.selectedQueue
|
||||||
|
if q == nil {
|
||||||
|
d.Println("ERROR: Press q to go back", style)
|
||||||
|
return
|
||||||
|
}
|
||||||
labelStyle := style.Foreground(tcell.ColorLightGray)
|
labelStyle := style.Foreground(tcell.ColorLightGray)
|
||||||
d.Print("Name: ", labelStyle)
|
d.Print("Name: ", labelStyle)
|
||||||
d.Println(q.Queue, style)
|
d.Println(q.Queue, style)
|
||||||
@@ -248,7 +253,13 @@ func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) {
|
|||||||
d.Print("Latency ", labelStyle)
|
d.Print("Latency ", labelStyle)
|
||||||
d.Println(q.Latency.Round(time.Second).String(), style)
|
d.Println(q.Latency.Round(time.Second).String(), style)
|
||||||
d.Print("MemUsage ", labelStyle)
|
d.Print("MemUsage ", labelStyle)
|
||||||
d.Println(ByteCount(q.MemoryUsage), style)
|
d.Println(byteCount(q.MemoryUsage), style)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the number of tasks to fetch.
|
||||||
|
func taskPageSize(s tcell.Screen) int {
|
||||||
|
_, h := s.Size()
|
||||||
|
return h - 15 // height - (# of rows used)
|
||||||
}
|
}
|
||||||
|
|
||||||
func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
||||||
@@ -267,6 +278,29 @@ func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) {
|
|||||||
{"LastError", alignLeft, func(t *asynq.TaskInfo) string { return t.LastErr }},
|
{"LastError", alignLeft, func(t *asynq.TaskInfo) string { return t.LastErr }},
|
||||||
}
|
}
|
||||||
drawTable(d, style, colConfigs, state.tasks, state.taskTableRowIdx-1)
|
drawTable(d, style, colConfigs, state.tasks, state.taskTableRowIdx-1)
|
||||||
|
|
||||||
|
// Pagination
|
||||||
|
pageSize := taskPageSize(d.Screen())
|
||||||
|
totalCount := getTaskCount(state.selectedQueue, state.taskState)
|
||||||
|
if pageSize < totalCount {
|
||||||
|
start := (state.pageNum-1)*pageSize + 1
|
||||||
|
end := start + len(state.tasks) - 1
|
||||||
|
paginationStyle := style.Foreground(tcell.ColorLightGray)
|
||||||
|
d.Print(fmt.Sprintf("Showing %d-%d out of %d", start, end, totalCount), paginationStyle)
|
||||||
|
if isNextTaskPageAvailable(d.Screen(), state) {
|
||||||
|
d.Print(" n=NextPage", paginationStyle)
|
||||||
|
}
|
||||||
|
if state.pageNum > 1 {
|
||||||
|
d.Print(" p=PrevPage", paginationStyle)
|
||||||
|
}
|
||||||
|
d.FillLine(' ', paginationStyle)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isNextTaskPageAvailable(s tcell.Screen, state *State) bool {
|
||||||
|
totalCount := getTaskCount(state.selectedQueue, state.taskState)
|
||||||
|
end := (state.pageNum-1)*taskPageSize(s) + len(state.tasks)
|
||||||
|
return end < totalCount
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define the order of states to show
|
// Define the order of states to show
|
||||||
|
@@ -10,7 +10,7 @@ import (
|
|||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
)
|
)
|
||||||
|
|
||||||
func fetchQueueInfo(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) {
|
func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) {
|
||||||
if !opts.UseRealData {
|
if !opts.UseRealData {
|
||||||
n := rand.Intn(100)
|
n := rand.Intn(100)
|
||||||
queuesCh <- []*asynq.QueueInfo{
|
queuesCh <- []*asynq.QueueInfo{
|
||||||
@@ -35,7 +35,15 @@ func fetchQueueInfo(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, erro
|
|||||||
res = append(res, info)
|
res = append(res, info)
|
||||||
}
|
}
|
||||||
queuesCh <- res
|
queuesCh <- res
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) {
|
||||||
|
q, err := i.GetQueueInfo(qname)
|
||||||
|
if err != nil {
|
||||||
|
errorCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
queueCh <- q
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
||||||
@@ -48,24 +56,26 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
|
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int,
|
||||||
|
tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
|
||||||
var (
|
var (
|
||||||
tasks []*asynq.TaskInfo
|
tasks []*asynq.TaskInfo
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
opts := []asynq.ListOption{asynq.PageSize(pageSize), asynq.Page(pageNum)}
|
||||||
switch taskState {
|
switch taskState {
|
||||||
case asynq.TaskStateActive:
|
case asynq.TaskStateActive:
|
||||||
tasks, err = i.ListActiveTasks(qname)
|
tasks, err = i.ListActiveTasks(qname, opts...)
|
||||||
case asynq.TaskStatePending:
|
case asynq.TaskStatePending:
|
||||||
tasks, err = i.ListPendingTasks(qname)
|
tasks, err = i.ListPendingTasks(qname, opts...)
|
||||||
case asynq.TaskStateScheduled:
|
case asynq.TaskStateScheduled:
|
||||||
tasks, err = i.ListScheduledTasks(qname)
|
tasks, err = i.ListScheduledTasks(qname, opts...)
|
||||||
case asynq.TaskStateRetry:
|
case asynq.TaskStateRetry:
|
||||||
tasks, err = i.ListRetryTasks(qname)
|
tasks, err = i.ListRetryTasks(qname, opts...)
|
||||||
case asynq.TaskStateArchived:
|
case asynq.TaskStateArchived:
|
||||||
tasks, err = i.ListArchivedTasks(qname)
|
tasks, err = i.ListArchivedTasks(qname, opts...)
|
||||||
case asynq.TaskStateCompleted:
|
case asynq.TaskStateCompleted:
|
||||||
tasks, err = i.ListCompletedTasks(qname)
|
tasks, err = i.ListCompletedTasks(qname, opts...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errorCh <- err
|
errorCh <- err
|
||||||
|
Reference in New Issue
Block a user