diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 264846e..05ffb5d 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -38,6 +38,8 @@ type State struct { selectedQueue *asynq.QueueInfo // queue shown on queue details view + pageNum int // pagination page number + view viewType // current view type prevView viewType // to support "go back" } @@ -74,12 +76,13 @@ func Run(opts Options) { // channels to send/receive data fetched asynchronously var ( errorCh = make(chan error) + queueCh = make(chan *asynq.QueueInfo) queuesCh = make(chan []*asynq.QueueInfo) tasksCh = make(chan []*asynq.TaskInfo) redisInfoCh = make(chan *redisInfo) ) - go fetchQueueInfo(inspector, queuesCh, errorCh, opts) + go fetchQueues(inspector, queuesCh, errorCh, opts) var state State // contained in this goroutine only; do not share @@ -160,7 +163,10 @@ func Run(opts Options) { state.view = viewTypeQueueDetails state.taskState = asynq.TaskStateActive state.tasks = nil - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) + state.pageNum = 1 + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) drawDash(s, baseStyle, &state, opts) } } else if ev.Rune() == '?' { @@ -168,7 +174,7 @@ func Run(opts Options) { state.view = viewTypeHelp drawDash(s, baseStyle, &state, opts) } else if ev.Key() == tcell.KeyF1 && state.view != viewTypeQueues { - go fetchQueueInfo(inspector, queuesCh, errorCh, opts) + go fetchQueues(inspector, queuesCh, errorCh, opts) ticker.Reset(interval) state.view = viewTypeQueues drawDash(s, baseStyle, &state, opts) @@ -187,21 +193,49 @@ func Run(opts Options) { drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyRight || ev.Rune() == 'l') && state.view == viewTypeQueueDetails { state.taskState = nextTaskState(state.taskState) + state.pageNum = 1 + state.taskTableRowIdx = 0 state.tasks = nil - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) drawDash(s, baseStyle, &state, opts) } else if (ev.Key() == tcell.KeyLeft || ev.Rune() == 'h') && state.view == viewTypeQueueDetails { state.taskState = prevTaskState(state.taskState) + state.pageNum = 1 + state.taskTableRowIdx = 0 state.tasks = nil - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, tasksCh, errorCh) + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) drawDash(s, baseStyle, &state, opts) + } else if ev.Rune() == 'n' && state.view == viewTypeQueueDetails { + pageSize := taskPageSize(s) + totalCount := getTaskCount(state.selectedQueue, state.taskState) + if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { + state.pageNum++ + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + pageSize, state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + } + } else if ev.Rune() == 'p' && state.view == viewTypeQueueDetails { + if state.pageNum > 1 { + state.pageNum-- + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) + ticker.Reset(interval) + } } } case <-ticker.C: switch state.view { case viewTypeQueues: - go fetchQueueInfo(inspector, queuesCh, errorCh, opts) + go fetchQueues(inspector, queuesCh, errorCh, opts) + case viewTypeQueueDetails: + go fetchQueueInfo(inspector, state.selectedQueue.Queue, queueCh, errorCh) + go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, + taskPageSize(s), state.pageNum, tasksCh, errorCh) case viewTypeRedis: go fetchRedisInfo(redisInfoCh, errorCh) } @@ -211,6 +245,11 @@ func Run(opts Options) { state.err = nil drawDash(s, baseStyle, &state, opts) + case q := <-queueCh: + state.selectedQueue = q + state.err = nil + drawDash(s, baseStyle, &state, opts) + case tasks := <-tasksCh: state.tasks = tasks state.err = nil diff --git a/tools/asynq/cmd/dash/draw.go b/tools/asynq/cmd/dash/draw.go index e507d10..007a10f 100644 --- a/tools/asynq/cmd/dash/draw.go +++ b/tools/asynq/cmd/dash/draw.go @@ -51,8 +51,8 @@ func drawDash(s tcell.Screen, style tcell.Style, state *State, opts Options) { d.NL() d.Println(fmt.Sprintf("Version: %s", state.redisInfo.version), style) d.Println(fmt.Sprintf("Uptime: %s", state.redisInfo.uptime), style) - d.Println(fmt.Sprintf("Memory Usage: %s", ByteCount(int64(state.redisInfo.memoryUsage))), style) - d.Println(fmt.Sprintf("Peak Memory Usage: %s", ByteCount(int64(state.redisInfo.peakMemoryUsage))), style) + d.Println(fmt.Sprintf("Memory Usage: %s", byteCount(int64(state.redisInfo.memoryUsage))), style) + d.Println(fmt.Sprintf("Peak Memory Usage: %s", byteCount(int64(state.redisInfo.peakMemoryUsage))), style) case viewTypeHelp: d.Println("=== HELP ===", style.Bold(true)) d.NL() @@ -194,8 +194,8 @@ func lpad(s string, padding int) string { return fmt.Sprintf(tmpl, s) } -// ByteCount converts the given bytes into human readable string -func ByteCount(b int64) string { +// byteCount converts the given bytes into human readable string +func byteCount(b int64) string { const unit = 1000 if b < unit { return fmt.Sprintf("%d B", b) @@ -212,21 +212,22 @@ func ByteCount(b int64) string { var queueColumnConfigs = []*columnConfig[*asynq.QueueInfo]{ {"Queue", alignLeft, func(q *asynq.QueueInfo) string { return q.Queue }}, - {"State", alignLeft, func(q *asynq.QueueInfo) string { - if q.Paused { - return "PAUSED" - } else { - return "RUN" - } - }}, + {"State", alignLeft, func(q *asynq.QueueInfo) string { return formatQueueState(q) }}, {"Size", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Size) }}, {"Latency", alignRight, func(q *asynq.QueueInfo) string { return q.Latency.Round(time.Second).String() }}, - {"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return ByteCount(q.MemoryUsage) }}, + {"MemoryUsage", alignRight, func(q *asynq.QueueInfo) string { return byteCount(q.MemoryUsage) }}, {"Processed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Processed) }}, {"Failed", alignRight, func(q *asynq.QueueInfo) string { return strconv.Itoa(q.Failed) }}, {"ErrorRate", alignRight, func(q *asynq.QueueInfo) string { return formatErrorRate(q.Processed, q.Failed) }}, } +func formatQueueState(q *asynq.QueueInfo) string { + if q.Paused { + return "PAUSED" + } + return "RUN" +} + func formatErrorRate(processed, failed int) string { if processed == 0 { return "-" @@ -240,6 +241,10 @@ func drawQueueTable(d *ScreenDrawer, style tcell.Style, state *State) { func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) { q := state.selectedQueue + if q == nil { + d.Println("ERROR: Press q to go back", style) + return + } labelStyle := style.Foreground(tcell.ColorLightGray) d.Print("Name: ", labelStyle) d.Println(q.Queue, style) @@ -248,7 +253,13 @@ func drawQueueSummary(d *ScreenDrawer, style tcell.Style, state *State) { d.Print("Latency ", labelStyle) d.Println(q.Latency.Round(time.Second).String(), style) d.Print("MemUsage ", labelStyle) - d.Println(ByteCount(q.MemoryUsage), style) + d.Println(byteCount(q.MemoryUsage), style) +} + +// Returns the number of tasks to fetch. +func taskPageSize(s tcell.Screen) int { + _, h := s.Size() + return h - 15 // height - (# of rows used) } func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) { @@ -267,6 +278,29 @@ func drawTaskTable(d *ScreenDrawer, style tcell.Style, state *State) { {"LastError", alignLeft, func(t *asynq.TaskInfo) string { return t.LastErr }}, } drawTable(d, style, colConfigs, state.tasks, state.taskTableRowIdx-1) + + // Pagination + pageSize := taskPageSize(d.Screen()) + totalCount := getTaskCount(state.selectedQueue, state.taskState) + if pageSize < totalCount { + start := (state.pageNum-1)*pageSize + 1 + end := start + len(state.tasks) - 1 + paginationStyle := style.Foreground(tcell.ColorLightGray) + d.Print(fmt.Sprintf("Showing %d-%d out of %d", start, end, totalCount), paginationStyle) + if isNextTaskPageAvailable(d.Screen(), state) { + d.Print(" n=NextPage", paginationStyle) + } + if state.pageNum > 1 { + d.Print(" p=PrevPage", paginationStyle) + } + d.FillLine(' ', paginationStyle) + } +} + +func isNextTaskPageAvailable(s tcell.Screen, state *State) bool { + totalCount := getTaskCount(state.selectedQueue, state.taskState) + end := (state.pageNum-1)*taskPageSize(s) + len(state.tasks) + return end < totalCount } // Define the order of states to show diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index aae7df3..115a71b 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -10,7 +10,7 @@ import ( "github.com/hibiken/asynq" ) -func fetchQueueInfo(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) { +func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) { if !opts.UseRealData { n := rand.Intn(100) queuesCh <- []*asynq.QueueInfo{ @@ -35,7 +35,15 @@ func fetchQueueInfo(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, erro res = append(res, info) } queuesCh <- res +} +func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) { + q, err := i.GetQueueInfo(qname) + if err != nil { + errorCh <- err + return + } + queueCh <- q } func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { @@ -48,24 +56,26 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { } } -func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { +func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, + tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { var ( tasks []*asynq.TaskInfo err error ) + opts := []asynq.ListOption{asynq.PageSize(pageSize), asynq.Page(pageNum)} switch taskState { case asynq.TaskStateActive: - tasks, err = i.ListActiveTasks(qname) + tasks, err = i.ListActiveTasks(qname, opts...) case asynq.TaskStatePending: - tasks, err = i.ListPendingTasks(qname) + tasks, err = i.ListPendingTasks(qname, opts...) case asynq.TaskStateScheduled: - tasks, err = i.ListScheduledTasks(qname) + tasks, err = i.ListScheduledTasks(qname, opts...) case asynq.TaskStateRetry: - tasks, err = i.ListRetryTasks(qname) + tasks, err = i.ListRetryTasks(qname, opts...) case asynq.TaskStateArchived: - tasks, err = i.ListArchivedTasks(qname) + tasks, err = i.ListArchivedTasks(qname, opts...) case asynq.TaskStateCompleted: - tasks, err = i.ListCompletedTasks(qname) + tasks, err = i.ListCompletedTasks(qname, opts...) } if err != nil { errorCh <- err