2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-03 05:12:01 +08:00

(cli): Update fetcher interface

This commit is contained in:
Ken Hibino
2022-05-27 13:49:43 -07:00
parent 5f47379e76
commit 6058e369c3
5 changed files with 91 additions and 84 deletions

View File

@@ -101,6 +101,7 @@ func Run(opts Options) {
f := dataFetcher{ f := dataFetcher{
inspector, inspector,
opts, opts,
s,
errorCh, errorCh,
queueCh, queueCh,
taskCh, taskCh,
@@ -127,7 +128,7 @@ func Run(opts Options) {
go fetchQueues(inspector, queuesCh, errorCh, opts) go fetchQueues(inspector, queuesCh, errorCh, opts)
go s.ChannelEvents(eventCh, done) // TODO: Double check that we are not leaking goroutine with this one. go s.ChannelEvents(eventCh, done) // TODO: Double check that we are not leaking goroutine with this one.
d.draw(&state) // draw initial screen d.Draw(&state) // draw initial screen
for { for {
// Update screen // Update screen
@@ -173,12 +174,12 @@ func Run(opts Options) {
if len(queues) < state.queueTableRowIdx { if len(queues) < state.queueTableRowIdx {
state.queueTableRowIdx = len(queues) state.queueTableRowIdx = len(queues)
} }
d.draw(&state) d.Draw(&state)
case q := <-queueCh: case q := <-queueCh:
state.selectedQueue = q state.selectedQueue = q
state.err = nil state.err = nil
d.draw(&state) d.Draw(&state)
case groups := <-groupsCh: case groups := <-groupsCh:
state.groups = groups state.groups = groups
@@ -186,7 +187,7 @@ func Run(opts Options) {
if len(groups) < state.groupTableRowIdx { if len(groups) < state.groupTableRowIdx {
state.groupTableRowIdx = len(groups) state.groupTableRowIdx = len(groups)
} }
d.draw(&state) d.Draw(&state)
case tasks := <-tasksCh: case tasks := <-tasksCh:
state.tasks = tasks state.tasks = tasks
@@ -194,17 +195,17 @@ func Run(opts Options) {
if len(tasks) < state.taskTableRowIdx { if len(tasks) < state.taskTableRowIdx {
state.taskTableRowIdx = len(tasks) state.taskTableRowIdx = len(tasks)
} }
d.draw(&state) d.Draw(&state)
case t := <-taskCh: case t := <-taskCh:
state.selectedTask = t state.selectedTask = t
state.err = nil state.err = nil
d.draw(&state) d.Draw(&state)
case redisInfo := <-redisInfoCh: case redisInfo := <-redisInfoCh:
state.redisInfo = *redisInfo state.redisInfo = *redisInfo
state.err = nil state.err = nil
d.draw(&state) d.Draw(&state)
case err := <-errorCh: case err := <-errorCh:
if errors.Is(err, asynq.ErrTaskNotFound) { if errors.Is(err, asynq.ErrTaskNotFound) {
@@ -212,7 +213,7 @@ func Run(opts Options) {
} else { } else {
state.err = err state.err = err
} }
d.draw(&state) d.Draw(&state)
} }
} }

View File

@@ -20,7 +20,7 @@ import (
// drawer draws UI with the given state. // drawer draws UI with the given state.
type drawer interface { type drawer interface {
draw(state *State) Draw(state *State)
} }
type dashDrawer struct { type dashDrawer struct {
@@ -28,7 +28,7 @@ type dashDrawer struct {
opts Options opts Options
} }
func (dd *dashDrawer) draw(state *State) { func (dd *dashDrawer) Draw(state *State) {
s, opts := dd.s, dd.opts s, opts := dd.s, dd.opts
s.Clear() s.Clear()
// Simulate data update on every render // Simulate data update on every render

View File

@@ -7,22 +7,19 @@ package dash
import ( import (
"math/rand" "math/rand"
"github.com/gdamore/tcell/v2"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
type fetcher interface { type fetcher interface {
fetchQueues() // Fetch retries data required by the given state of the dashboard.
fetchQueueInfo(qname string) Fetch(state *State)
fetchRedisInfo()
fetchTaskInfo(qname, taskID string)
fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int)
fetchAggregatingTasks(qname, group string, pageSize, pageNum int)
fetchGroups(qname string)
} }
type dataFetcher struct { type dataFetcher struct {
inspector *asynq.Inspector inspector *asynq.Inspector
opts Options opts Options
s tcell.Screen
errorCh chan<- error errorCh chan<- error
queueCh chan<- *asynq.QueueInfo queueCh chan<- *asynq.QueueInfo
@@ -33,6 +30,27 @@ type dataFetcher struct {
redisInfoCh chan<- *redisInfo redisInfoCh chan<- *redisInfo
} }
func (f *dataFetcher) Fetch(state *State) {
switch state.view {
case viewTypeQueues:
f.fetchQueues()
case viewTypeQueueDetails:
if shouldShowGroupTable(state) {
f.fetchGroups(state.selectedQueue.Queue)
} else if state.taskState == asynq.TaskStateAggregating {
f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(f.s), state.pageNum)
} else {
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(f.s), state.pageNum)
}
// if the task modal is open, additionally fetch the selected task's info
if state.taskID != "" {
f.fetchTaskInfo(state.selectedQueue.Queue, state.taskID)
}
case viewTypeRedis:
f.fetchRedisInfo()
}
}
func (f *dataFetcher) fetchQueues() { func (f *dataFetcher) fetchQueues() {
var ( var (
inspector = f.inspector inspector = f.inspector
@@ -70,15 +88,6 @@ func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh
queuesCh <- res queuesCh <- res
} }
func (f *dataFetcher) fetchQueueInfo(qname string) {
var (
inspector = f.inspector
queueCh = f.queueCh
errorCh = f.errorCh
)
go fetchQueueInfo(inspector, qname, queueCh, errorCh)
}
func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) { func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) {
q, err := i.GetQueueInfo(qname) q, err := i.GetQueueInfo(qname)
if err != nil { if err != nil {
@@ -103,8 +112,14 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
} }
func (f *dataFetcher) fetchGroups(qname string) { func (f *dataFetcher) fetchGroups(qname string) {
i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh var (
i = f.inspector
groupsCh = f.groupsCh
errorCh = f.errorCh
queueCh = f.queueCh
)
go fetchGroups(i, qname, groupsCh, errorCh) go fetchGroups(i, qname, groupsCh, errorCh)
go fetchQueueInfo(i, qname, queueCh, errorCh)
} }
func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) { func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) {
@@ -121,8 +136,10 @@ func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageN
i = f.inspector i = f.inspector
tasksCh = f.tasksCh tasksCh = f.tasksCh
errorCh = f.errorCh errorCh = f.errorCh
queueCh = f.queueCh
) )
go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh) go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh)
go fetchQueueInfo(i, qname, queueCh, errorCh)
} }
func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int, func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int,
@@ -140,8 +157,10 @@ func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSi
i = f.inspector i = f.inspector
tasksCh = f.tasksCh tasksCh = f.tasksCh
errorCh = f.errorCh errorCh = f.errorCh
queueCh = f.queueCh
) )
go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh) go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh)
go fetchQueueInfo(i, qname, queueCh, errorCh)
} }
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int,

View File

@@ -73,16 +73,16 @@ func (h *keyEventHandler) goBack() {
) )
if state.view == viewTypeHelp { if state.view == viewTypeHelp {
state.view = state.prevView // exit help state.view = state.prevView // exit help
d.draw(state) d.Draw(state)
} else if state.view == viewTypeQueueDetails { } else if state.view == viewTypeQueueDetails {
// if task modal is open close it; otherwise go back to the previous view // if task modal is open close it; otherwise go back to the previous view
if state.taskID != "" { if state.taskID != "" {
state.taskID = "" state.taskID = ""
state.selectedTask = nil state.selectedTask = nil
d.draw(state) d.Draw(state)
} else { } else {
state.view = viewTypeQueues state.view = viewTypeQueues
d.draw(state) d.Draw(state)
} }
} else { } else {
h.quit() h.quit()
@@ -104,7 +104,7 @@ func (h *keyEventHandler) downKeyQueues() {
} else { } else {
h.state.queueTableRowIdx = 0 // loop back h.state.queueTableRowIdx = 0 // loop back
} }
h.drawer.draw(h.state) h.drawer.Draw(h.state)
} }
func (h *keyEventHandler) downKeyQueueDetails() { func (h *keyEventHandler) downKeyQueueDetails() {
@@ -122,7 +122,7 @@ func (h *keyEventHandler) downKeyQueueDetails() {
state.taskTableRowIdx = 0 // loop back state.taskTableRowIdx = 0 // loop back
} }
} }
h.drawer.draw(state) h.drawer.Draw(state)
} }
func (h *keyEventHandler) handleUpKey() { func (h *keyEventHandler) handleUpKey() {
@@ -141,7 +141,7 @@ func (h *keyEventHandler) upKeyQueues() {
} else { } else {
state.queueTableRowIdx-- state.queueTableRowIdx--
} }
h.drawer.draw(state) h.drawer.Draw(state)
} }
func (h *keyEventHandler) upKeyQueueDetails() { func (h *keyEventHandler) upKeyQueueDetails() {
@@ -159,7 +159,7 @@ func (h *keyEventHandler) upKeyQueueDetails() {
state.taskTableRowIdx-- state.taskTableRowIdx--
} }
} }
h.drawer.draw(state) h.drawer.Draw(state)
} }
func (h *keyEventHandler) handleEnterKey() { func (h *keyEventHandler) handleEnterKey() {
@@ -177,7 +177,6 @@ func (h *keyEventHandler) resetTicker() {
func (h *keyEventHandler) enterKeyQueues() { func (h *keyEventHandler) enterKeyQueues() {
var ( var (
s = h.s
state = h.state state = h.state
f = h.fetcher f = h.fetcher
d = h.drawer d = h.drawer
@@ -188,15 +187,14 @@ func (h *keyEventHandler) enterKeyQueues() {
state.taskState = asynq.TaskStateActive state.taskState = asynq.TaskStateActive
state.tasks = nil state.tasks = nil
state.pageNum = 1 state.pageNum = 1
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) f.Fetch(state)
h.resetTicker() h.resetTicker()
d.draw(state) d.Draw(state)
} }
} }
func (h *keyEventHandler) enterKeyQueueDetails() { func (h *keyEventHandler) enterKeyQueueDetails() {
var ( var (
s = h.s
state = h.state state = h.state
f = h.fetcher f = h.fetcher
d = h.drawer d = h.drawer
@@ -205,23 +203,22 @@ func (h *keyEventHandler) enterKeyQueueDetails() {
state.selectedGroup = state.groups[state.groupTableRowIdx-1] state.selectedGroup = state.groups[state.groupTableRowIdx-1]
state.tasks = nil state.tasks = nil
state.pageNum = 1 state.pageNum = 1
f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum) f.Fetch(state)
h.resetTicker() h.resetTicker()
d.draw(state) d.Draw(state)
} else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 { } else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 {
task := state.tasks[state.taskTableRowIdx-1] task := state.tasks[state.taskTableRowIdx-1]
state.selectedTask = task state.selectedTask = task
state.taskID = task.ID state.taskID = task.ID
f.fetchTaskInfo(state.selectedQueue.Queue, task.ID) f.Fetch(state)
h.resetTicker() h.resetTicker()
d.draw(state) d.Draw(state)
} }
} }
func (h *keyEventHandler) handleLeftKey() { func (h *keyEventHandler) handleLeftKey() {
var ( var (
s = h.s
state = h.state state = h.state
f = h.fetcher f = h.fetcher
d = h.drawer d = h.drawer
@@ -232,19 +229,14 @@ func (h *keyEventHandler) handleLeftKey() {
state.taskTableRowIdx = 0 state.taskTableRowIdx = 0
state.tasks = nil state.tasks = nil
state.selectedGroup = nil state.selectedGroup = nil
if shouldShowGroupTable(state) { f.Fetch(state)
f.fetchGroups(state.selectedQueue.Queue)
} else {
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
h.resetTicker() h.resetTicker()
d.draw(state) d.Draw(state)
} }
} }
func (h *keyEventHandler) handleRightKey() { func (h *keyEventHandler) handleRightKey() {
var ( var (
s = h.s
state = h.state state = h.state
f = h.fetcher f = h.fetcher
d = h.drawer d = h.drawer
@@ -255,13 +247,9 @@ func (h *keyEventHandler) handleRightKey() {
state.taskTableRowIdx = 0 state.taskTableRowIdx = 0
state.tasks = nil state.tasks = nil
state.selectedGroup = nil state.selectedGroup = nil
if shouldShowGroupTable(state) { f.Fetch(state)
f.fetchGroups(state.selectedQueue.Queue)
} else {
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
h.resetTicker() h.resetTicker()
d.draw(state) d.Draw(state)
} }
} }
@@ -280,14 +268,14 @@ func (h *keyEventHandler) nextPage() {
end := start + pageSize end := start + pageSize
if end <= total { if end <= total {
state.pageNum++ state.pageNum++
d.draw(state) d.Draw(state)
} }
} else { } else {
pageSize := taskPageSize(s) pageSize := taskPageSize(s)
totalCount := getTaskCount(state.selectedQueue, state.taskState) totalCount := getTaskCount(state.selectedQueue, state.taskState)
if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount {
state.pageNum++ state.pageNum++
f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum) f.Fetch(state)
h.resetTicker() h.resetTicker()
} }
} }
@@ -307,12 +295,12 @@ func (h *keyEventHandler) prevPage() {
start := (state.pageNum - 1) * pageSize start := (state.pageNum - 1) * pageSize
if start > 0 { if start > 0 {
state.pageNum-- state.pageNum--
d.draw(state) d.Draw(state)
} }
} else { } else {
if state.pageNum > 1 { if state.pageNum > 1 {
state.pageNum-- state.pageNum--
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) f.Fetch(state)
h.resetTicker() h.resetTicker()
} }
} }
@@ -326,10 +314,10 @@ func (h *keyEventHandler) showQueues() {
d = h.drawer d = h.drawer
) )
if state.view != viewTypeQueues { if state.view != viewTypeQueues {
f.fetchQueues()
h.resetTicker()
state.view = viewTypeQueues state.view = viewTypeQueues
d.draw(state) f.Fetch(state)
h.resetTicker()
d.Draw(state)
} }
} }
@@ -341,7 +329,7 @@ func (h *keyEventHandler) showServers() {
if state.view != viewTypeServers { if state.view != viewTypeServers {
//TODO Start data fetch and reset ticker //TODO Start data fetch and reset ticker
state.view = viewTypeServers state.view = viewTypeServers
d.draw(state) d.Draw(state)
} }
} }
@@ -353,7 +341,7 @@ func (h *keyEventHandler) showSchedulers() {
if state.view != viewTypeSchedulers { if state.view != viewTypeSchedulers {
//TODO Start data fetch and reset ticker //TODO Start data fetch and reset ticker
state.view = viewTypeSchedulers state.view = viewTypeSchedulers
d.draw(state) d.Draw(state)
} }
} }
@@ -364,10 +352,10 @@ func (h *keyEventHandler) showRedisInfo() {
d = h.drawer d = h.drawer
) )
if state.view != viewTypeRedis { if state.view != viewTypeRedis {
f.fetchRedisInfo()
h.resetTicker()
state.view = viewTypeRedis state.view = viewTypeRedis
d.draw(state) f.Fetch(state)
h.resetTicker()
d.Draw(state)
} }
} }
@@ -379,6 +367,6 @@ func (h *keyEventHandler) showHelp() {
if state.view != viewTypeHelp { if state.view != viewTypeHelp {
state.prevView = state.view state.prevView = state.view
state.view = viewTypeHelp state.view = viewTypeHelp
d.draw(state) d.Draw(state)
} }
} }

View File

@@ -6,19 +6,24 @@ package dash
import ( import (
"testing" "testing"
"time"
"github.com/gdamore/tcell/v2" "github.com/gdamore/tcell/v2"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
func makeKeyEventHandler(state *State) *keyEventHandler { func makeKeyEventHandler(t *testing.T, state *State) *keyEventHandler {
ticker := time.NewTicker(time.Second)
t.Cleanup(func() { ticker.Stop() })
return &keyEventHandler{ return &keyEventHandler{
s: tcell.NewSimulationScreen("UTF-8"), s: tcell.NewSimulationScreen("UTF-8"),
state: state, state: state,
done: make(chan struct{}), done: make(chan struct{}),
fetcher: &fakeFetcher{}, fetcher: &fakeFetcher{},
drawer: &fakeDrawer{}, drawer: &fakeDrawer{},
ticker: ticker,
pollInterval: time.Second,
} }
} }
@@ -164,7 +169,7 @@ func TestKeyEventHandler(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
h := makeKeyEventHandler(tc.state) h := makeKeyEventHandler(t, tc.state)
for _, e := range tc.events { for _, e := range tc.events {
h.HandleKeyEvent(e) h.HandleKeyEvent(e)
} }
@@ -180,14 +185,8 @@ func TestKeyEventHandler(t *testing.T) {
type fakeFetcher struct{} type fakeFetcher struct{}
func (f *fakeFetcher) fetchQueues() {} func (f *fakeFetcher) Fetch(s *State) {}
func (f *fakeFetcher) fetchQueueInfo(qname string) {}
func (f *fakeFetcher) fetchRedisInfo() {}
func (f *fakeFetcher) fetchTaskInfo(qname, taskID string) {}
func (f *fakeFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) {}
func (f *fakeFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) {}
func (f *fakeFetcher) fetchGroups(qname string) {}
type fakeDrawer struct{} type fakeDrawer struct{}
func (d *fakeDrawer) draw(s *State) {} func (d *fakeDrawer) Draw(s *State) {}