mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
(cli): Refactor data fetch in dash
This commit is contained in:
@@ -77,18 +77,23 @@ func dash(cmd *cobra.Command, args []string) {
|
|||||||
baseStyle := tcell.StyleDefault.Background(tcell.ColorReset).Foreground(tcell.ColorReset)
|
baseStyle := tcell.StyleDefault.Background(tcell.ColorReset).Foreground(tcell.ColorReset)
|
||||||
s.SetStyle(baseStyle)
|
s.SetStyle(baseStyle)
|
||||||
|
|
||||||
queues, err := getQueueInfo(inspector)
|
// channels to send/receive data fetched asynchronously
|
||||||
state := dashState{
|
var (
|
||||||
view: viewTypeQueues,
|
queuesCh = make(chan []*asynq.QueueInfo)
|
||||||
queues: queues,
|
errorCh = make(chan error)
|
||||||
err: err,
|
)
|
||||||
}
|
|
||||||
|
go getQueueInfo(inspector, queuesCh, errorCh)
|
||||||
|
|
||||||
|
var state dashState // contained in this goroutine only; do not share
|
||||||
|
|
||||||
// draw initial screen
|
// draw initial screen
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
|
|
||||||
eventCh := make(chan tcell.Event)
|
eventCh := make(chan tcell.Event)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
ticker := time.NewTicker(2 * time.Second)
|
const interval = 2 * time.Second
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// TODO: Double check that we are not leaking goroutine with this one.
|
// TODO: Double check that we are not leaking goroutine with this one.
|
||||||
@@ -139,51 +144,69 @@ func dash(cmd *cobra.Command, args []string) {
|
|||||||
state.prevView = state.view
|
state.prevView = state.view
|
||||||
state.view = viewTypeHelp
|
state.view = viewTypeHelp
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
} else if ev.Key() == tcell.KeyF1 {
|
} else if ev.Key() == tcell.KeyF1 && state.view != viewTypeQueues {
|
||||||
|
go getQueueInfo(inspector, queuesCh, errorCh)
|
||||||
|
ticker.Reset(interval)
|
||||||
state.view = viewTypeQueues
|
state.view = viewTypeQueues
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
} else if ev.Key() == tcell.KeyF2 {
|
} else if ev.Key() == tcell.KeyF2 && state.view != viewTypeServers {
|
||||||
state.view = viewTypeServers
|
state.view = viewTypeServers
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
} else if ev.Key() == tcell.KeyF3 {
|
} else if ev.Key() == tcell.KeyF3 && state.view != viewTypeSchedulers {
|
||||||
state.view = viewTypeSchedulers
|
state.view = viewTypeSchedulers
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
} else if ev.Key() == tcell.KeyF4 {
|
} else if ev.Key() == tcell.KeyF4 && state.view != viewTypeRedis {
|
||||||
state.view = viewTypeRedis
|
state.view = viewTypeRedis
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
state.queues, state.err = getQueueInfo(inspector)
|
switch state.view {
|
||||||
|
case viewTypeQueues:
|
||||||
|
go getQueueInfo(inspector, queuesCh, errorCh)
|
||||||
|
|
||||||
|
// TODO: Add more cases for other type of data
|
||||||
|
}
|
||||||
|
|
||||||
|
case queues := <-queuesCh:
|
||||||
|
state.queues = queues
|
||||||
|
state.err = nil
|
||||||
|
drawDash(s, baseStyle, &state)
|
||||||
|
|
||||||
|
case err := <-errorCh:
|
||||||
|
state.err = err
|
||||||
drawDash(s, baseStyle, &state)
|
drawDash(s, baseStyle, &state)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getQueueInfo(i *asynq.Inspector) ([]*asynq.QueueInfo, error) {
|
func getQueueInfo(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error) {
|
||||||
if !flagUseRealData {
|
if !flagUseRealData {
|
||||||
n := rand.Intn(100)
|
n := rand.Intn(100)
|
||||||
return []*asynq.QueueInfo{
|
queuesCh <- []*asynq.QueueInfo{
|
||||||
{Queue: "default", Size: 1800 + n, Pending: 700 + n, Active: 300, Aggregating: 300, Scheduled: 200, Retry: 100, Archived: 200},
|
{Queue: "default", Size: 1800 + n, Pending: 700 + n, Active: 300, Aggregating: 300, Scheduled: 200, Retry: 100, Archived: 200},
|
||||||
{Queue: "critical", Size: 2300 + n, Pending: 1000 + n, Active: 500, Retry: 400, Completed: 400},
|
{Queue: "critical", Size: 2300 + n, Pending: 1000 + n, Active: 500, Retry: 400, Completed: 400},
|
||||||
{Queue: "low", Size: 900 + n, Pending: n, Active: 300, Scheduled: 400, Completed: 200},
|
{Queue: "low", Size: 900 + n, Pending: n, Active: 300, Scheduled: 400, Completed: 200},
|
||||||
}, nil
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
queues, err := i.Queues()
|
queues, err := i.Queues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
errorCh <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
var res []*asynq.QueueInfo
|
var res []*asynq.QueueInfo
|
||||||
for _, q := range queues {
|
for _, q := range queues {
|
||||||
info, err := i.GetQueueInfo(q)
|
info, err := i.GetQueueInfo(q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
errorCh <- err
|
||||||
|
return
|
||||||
}
|
}
|
||||||
res = append(res, info)
|
res = append(res, info)
|
||||||
}
|
}
|
||||||
return res, nil
|
queuesCh <- res
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user