From 6058e369c3d10adb4a87698e6eff9017ee2f2da7 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 27 May 2022 13:49:43 -0700 Subject: [PATCH] (cli): Update fetcher interface --- tools/asynq/cmd/dash/dash.go | 17 +++--- tools/asynq/cmd/dash/draw.go | 4 +- tools/asynq/cmd/dash/fetch.go | 53 +++++++++++++------ tools/asynq/cmd/dash/key_event.go | 72 +++++++++++--------------- tools/asynq/cmd/dash/key_event_test.go | 29 +++++------ 5 files changed, 91 insertions(+), 84 deletions(-) diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index d18e5dc..f1506ee 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -101,6 +101,7 @@ func Run(opts Options) { f := dataFetcher{ inspector, opts, + s, errorCh, queueCh, taskCh, @@ -127,7 +128,7 @@ func Run(opts Options) { go fetchQueues(inspector, queuesCh, errorCh, opts) go s.ChannelEvents(eventCh, done) // TODO: Double check that we are not leaking goroutine with this one. - d.draw(&state) // draw initial screen + d.Draw(&state) // draw initial screen for { // Update screen @@ -173,12 +174,12 @@ func Run(opts Options) { if len(queues) < state.queueTableRowIdx { state.queueTableRowIdx = len(queues) } - d.draw(&state) + d.Draw(&state) case q := <-queueCh: state.selectedQueue = q state.err = nil - d.draw(&state) + d.Draw(&state) case groups := <-groupsCh: state.groups = groups @@ -186,7 +187,7 @@ func Run(opts Options) { if len(groups) < state.groupTableRowIdx { state.groupTableRowIdx = len(groups) } - d.draw(&state) + d.Draw(&state) case tasks := <-tasksCh: state.tasks = tasks @@ -194,17 +195,17 @@ func Run(opts Options) { if len(tasks) < state.taskTableRowIdx { state.taskTableRowIdx = len(tasks) } - d.draw(&state) + d.Draw(&state) case t := <-taskCh: state.selectedTask = t state.err = nil - d.draw(&state) + d.Draw(&state) case redisInfo := <-redisInfoCh: state.redisInfo = *redisInfo state.err = nil - d.draw(&state) + d.Draw(&state) case err := <-errorCh: if errors.Is(err, asynq.ErrTaskNotFound) { @@ -212,7 +213,7 @@ func Run(opts Options) { } else { state.err = err } - d.draw(&state) + d.Draw(&state) } } diff --git a/tools/asynq/cmd/dash/draw.go b/tools/asynq/cmd/dash/draw.go index 3e14539..8fea400 100644 --- a/tools/asynq/cmd/dash/draw.go +++ b/tools/asynq/cmd/dash/draw.go @@ -20,7 +20,7 @@ import ( // drawer draws UI with the given state. type drawer interface { - draw(state *State) + Draw(state *State) } type dashDrawer struct { @@ -28,7 +28,7 @@ type dashDrawer struct { opts Options } -func (dd *dashDrawer) draw(state *State) { +func (dd *dashDrawer) Draw(state *State) { s, opts := dd.s, dd.opts s.Clear() // Simulate data update on every render diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index a70246b..8f7c2b5 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -7,22 +7,19 @@ package dash import ( "math/rand" + "github.com/gdamore/tcell/v2" "github.com/hibiken/asynq" ) type fetcher interface { - fetchQueues() - fetchQueueInfo(qname string) - fetchRedisInfo() - fetchTaskInfo(qname, taskID string) - fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) - fetchAggregatingTasks(qname, group string, pageSize, pageNum int) - fetchGroups(qname string) + // Fetch retries data required by the given state of the dashboard. + Fetch(state *State) } type dataFetcher struct { inspector *asynq.Inspector opts Options + s tcell.Screen errorCh chan<- error queueCh chan<- *asynq.QueueInfo @@ -33,6 +30,27 @@ type dataFetcher struct { redisInfoCh chan<- *redisInfo } +func (f *dataFetcher) Fetch(state *State) { + switch state.view { + case viewTypeQueues: + f.fetchQueues() + case viewTypeQueueDetails: + if shouldShowGroupTable(state) { + f.fetchGroups(state.selectedQueue.Queue) + } else if state.taskState == asynq.TaskStateAggregating { + f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(f.s), state.pageNum) + } else { + f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(f.s), state.pageNum) + } + // if the task modal is open, additionally fetch the selected task's info + if state.taskID != "" { + f.fetchTaskInfo(state.selectedQueue.Queue, state.taskID) + } + case viewTypeRedis: + f.fetchRedisInfo() + } +} + func (f *dataFetcher) fetchQueues() { var ( inspector = f.inspector @@ -70,15 +88,6 @@ func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh queuesCh <- res } -func (f *dataFetcher) fetchQueueInfo(qname string) { - var ( - inspector = f.inspector - queueCh = f.queueCh - errorCh = f.errorCh - ) - go fetchQueueInfo(inspector, qname, queueCh, errorCh) -} - func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) { q, err := i.GetQueueInfo(qname) if err != nil { @@ -103,8 +112,14 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { } func (f *dataFetcher) fetchGroups(qname string) { - i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh + var ( + i = f.inspector + groupsCh = f.groupsCh + errorCh = f.errorCh + queueCh = f.queueCh + ) go fetchGroups(i, qname, groupsCh, errorCh) + go fetchQueueInfo(i, qname, queueCh, errorCh) } func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) { @@ -121,8 +136,10 @@ func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageN i = f.inspector tasksCh = f.tasksCh errorCh = f.errorCh + queueCh = f.queueCh ) go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh) + go fetchQueueInfo(i, qname, queueCh, errorCh) } func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int, @@ -140,8 +157,10 @@ func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSi i = f.inspector tasksCh = f.tasksCh errorCh = f.errorCh + queueCh = f.queueCh ) go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh) + go fetchQueueInfo(i, qname, queueCh, errorCh) } func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, diff --git a/tools/asynq/cmd/dash/key_event.go b/tools/asynq/cmd/dash/key_event.go index 611eff1..740306d 100644 --- a/tools/asynq/cmd/dash/key_event.go +++ b/tools/asynq/cmd/dash/key_event.go @@ -73,16 +73,16 @@ func (h *keyEventHandler) goBack() { ) if state.view == viewTypeHelp { state.view = state.prevView // exit help - d.draw(state) + d.Draw(state) } else if state.view == viewTypeQueueDetails { // if task modal is open close it; otherwise go back to the previous view if state.taskID != "" { state.taskID = "" state.selectedTask = nil - d.draw(state) + d.Draw(state) } else { state.view = viewTypeQueues - d.draw(state) + d.Draw(state) } } else { h.quit() @@ -104,7 +104,7 @@ func (h *keyEventHandler) downKeyQueues() { } else { h.state.queueTableRowIdx = 0 // loop back } - h.drawer.draw(h.state) + h.drawer.Draw(h.state) } func (h *keyEventHandler) downKeyQueueDetails() { @@ -122,7 +122,7 @@ func (h *keyEventHandler) downKeyQueueDetails() { state.taskTableRowIdx = 0 // loop back } } - h.drawer.draw(state) + h.drawer.Draw(state) } func (h *keyEventHandler) handleUpKey() { @@ -141,7 +141,7 @@ func (h *keyEventHandler) upKeyQueues() { } else { state.queueTableRowIdx-- } - h.drawer.draw(state) + h.drawer.Draw(state) } func (h *keyEventHandler) upKeyQueueDetails() { @@ -159,7 +159,7 @@ func (h *keyEventHandler) upKeyQueueDetails() { state.taskTableRowIdx-- } } - h.drawer.draw(state) + h.drawer.Draw(state) } func (h *keyEventHandler) handleEnterKey() { @@ -177,7 +177,6 @@ func (h *keyEventHandler) resetTicker() { func (h *keyEventHandler) enterKeyQueues() { var ( - s = h.s state = h.state f = h.fetcher d = h.drawer @@ -188,15 +187,14 @@ func (h *keyEventHandler) enterKeyQueues() { state.taskState = asynq.TaskStateActive state.tasks = nil state.pageNum = 1 - f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) + f.Fetch(state) h.resetTicker() - d.draw(state) + d.Draw(state) } } func (h *keyEventHandler) enterKeyQueueDetails() { var ( - s = h.s state = h.state f = h.fetcher d = h.drawer @@ -205,23 +203,22 @@ func (h *keyEventHandler) enterKeyQueueDetails() { state.selectedGroup = state.groups[state.groupTableRowIdx-1] state.tasks = nil state.pageNum = 1 - f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum) + f.Fetch(state) h.resetTicker() - d.draw(state) + d.Draw(state) } else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 { task := state.tasks[state.taskTableRowIdx-1] state.selectedTask = task state.taskID = task.ID - f.fetchTaskInfo(state.selectedQueue.Queue, task.ID) + f.Fetch(state) h.resetTicker() - d.draw(state) + d.Draw(state) } } func (h *keyEventHandler) handleLeftKey() { var ( - s = h.s state = h.state f = h.fetcher d = h.drawer @@ -232,19 +229,14 @@ func (h *keyEventHandler) handleLeftKey() { state.taskTableRowIdx = 0 state.tasks = nil state.selectedGroup = nil - if shouldShowGroupTable(state) { - f.fetchGroups(state.selectedQueue.Queue) - } else { - f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) - } + f.Fetch(state) h.resetTicker() - d.draw(state) + d.Draw(state) } } func (h *keyEventHandler) handleRightKey() { var ( - s = h.s state = h.state f = h.fetcher d = h.drawer @@ -255,13 +247,9 @@ func (h *keyEventHandler) handleRightKey() { state.taskTableRowIdx = 0 state.tasks = nil state.selectedGroup = nil - if shouldShowGroupTable(state) { - f.fetchGroups(state.selectedQueue.Queue) - } else { - f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) - } + f.Fetch(state) h.resetTicker() - d.draw(state) + d.Draw(state) } } @@ -280,14 +268,14 @@ func (h *keyEventHandler) nextPage() { end := start + pageSize if end <= total { state.pageNum++ - d.draw(state) + d.Draw(state) } } else { pageSize := taskPageSize(s) totalCount := getTaskCount(state.selectedQueue, state.taskState) if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { state.pageNum++ - f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum) + f.Fetch(state) h.resetTicker() } } @@ -307,12 +295,12 @@ func (h *keyEventHandler) prevPage() { start := (state.pageNum - 1) * pageSize if start > 0 { state.pageNum-- - d.draw(state) + d.Draw(state) } } else { if state.pageNum > 1 { state.pageNum-- - f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) + f.Fetch(state) h.resetTicker() } } @@ -326,10 +314,10 @@ func (h *keyEventHandler) showQueues() { d = h.drawer ) if state.view != viewTypeQueues { - f.fetchQueues() - h.resetTicker() state.view = viewTypeQueues - d.draw(state) + f.Fetch(state) + h.resetTicker() + d.Draw(state) } } @@ -341,7 +329,7 @@ func (h *keyEventHandler) showServers() { if state.view != viewTypeServers { //TODO Start data fetch and reset ticker state.view = viewTypeServers - d.draw(state) + d.Draw(state) } } @@ -353,7 +341,7 @@ func (h *keyEventHandler) showSchedulers() { if state.view != viewTypeSchedulers { //TODO Start data fetch and reset ticker state.view = viewTypeSchedulers - d.draw(state) + d.Draw(state) } } @@ -364,10 +352,10 @@ func (h *keyEventHandler) showRedisInfo() { d = h.drawer ) if state.view != viewTypeRedis { - f.fetchRedisInfo() - h.resetTicker() state.view = viewTypeRedis - d.draw(state) + f.Fetch(state) + h.resetTicker() + d.Draw(state) } } @@ -379,6 +367,6 @@ func (h *keyEventHandler) showHelp() { if state.view != viewTypeHelp { state.prevView = state.view state.view = viewTypeHelp - d.draw(state) + d.Draw(state) } } diff --git a/tools/asynq/cmd/dash/key_event_test.go b/tools/asynq/cmd/dash/key_event_test.go index 315cdcb..ddb5c43 100644 --- a/tools/asynq/cmd/dash/key_event_test.go +++ b/tools/asynq/cmd/dash/key_event_test.go @@ -6,19 +6,24 @@ package dash import ( "testing" + "time" "github.com/gdamore/tcell/v2" "github.com/google/go-cmp/cmp" "github.com/hibiken/asynq" ) -func makeKeyEventHandler(state *State) *keyEventHandler { +func makeKeyEventHandler(t *testing.T, state *State) *keyEventHandler { + ticker := time.NewTicker(time.Second) + t.Cleanup(func() { ticker.Stop() }) return &keyEventHandler{ - s: tcell.NewSimulationScreen("UTF-8"), - state: state, - done: make(chan struct{}), - fetcher: &fakeFetcher{}, - drawer: &fakeDrawer{}, + s: tcell.NewSimulationScreen("UTF-8"), + state: state, + done: make(chan struct{}), + fetcher: &fakeFetcher{}, + drawer: &fakeDrawer{}, + ticker: ticker, + pollInterval: time.Second, } } @@ -164,7 +169,7 @@ func TestKeyEventHandler(t *testing.T) { for _, tc := range tests { t.Run(tc.desc, func(t *testing.T) { - h := makeKeyEventHandler(tc.state) + h := makeKeyEventHandler(t, tc.state) for _, e := range tc.events { h.HandleKeyEvent(e) } @@ -180,14 +185,8 @@ func TestKeyEventHandler(t *testing.T) { type fakeFetcher struct{} -func (f *fakeFetcher) fetchQueues() {} -func (f *fakeFetcher) fetchQueueInfo(qname string) {} -func (f *fakeFetcher) fetchRedisInfo() {} -func (f *fakeFetcher) fetchTaskInfo(qname, taskID string) {} -func (f *fakeFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) {} -func (f *fakeFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) {} -func (f *fakeFetcher) fetchGroups(qname string) {} +func (f *fakeFetcher) Fetch(s *State) {} type fakeDrawer struct{} -func (d *fakeDrawer) draw(s *State) {} +func (d *fakeDrawer) Draw(s *State) {}