mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-03 05:12:01 +08:00
(cli): Refactor
This commit is contained in:
@@ -16,7 +16,6 @@ import (
|
||||
|
||||
var (
|
||||
flagDebug = false
|
||||
flagUseRealData = false
|
||||
flagPollInterval = 8 * time.Second
|
||||
)
|
||||
|
||||
@@ -25,7 +24,6 @@ func init() {
|
||||
dashCmd.Flags().DurationVar(&flagPollInterval, "refresh", 8*time.Second, "Interval between data refresh. Minimum value is 1s.")
|
||||
// TODO: Remove this debug once we're done
|
||||
dashCmd.Flags().BoolVar(&flagDebug, "debug", false, "Print debug info")
|
||||
dashCmd.Flags().BoolVar(&flagUseRealData, "realdata", true, "Use real data in redis")
|
||||
}
|
||||
|
||||
var dashCmd = &cobra.Command{
|
||||
@@ -41,7 +39,6 @@ var dashCmd = &cobra.Command{
|
||||
}
|
||||
dash.Run(dash.Options{
|
||||
DebugMode: flagDebug,
|
||||
UseRealData: flagUseRealData,
|
||||
PollInterval: flagPollInterval,
|
||||
})
|
||||
},
|
||||
|
@@ -25,11 +25,10 @@ const (
|
||||
|
||||
// State holds dashboard state.
|
||||
type State struct {
|
||||
queues []*asynq.QueueInfo
|
||||
tasks []*asynq.TaskInfo
|
||||
groups []*asynq.GroupInfo
|
||||
redisInfo redisInfo
|
||||
err error
|
||||
queues []*asynq.QueueInfo
|
||||
tasks []*asynq.TaskInfo
|
||||
groups []*asynq.GroupInfo
|
||||
err error
|
||||
|
||||
// Note: index zero corresponds to the table header; index=1 correctponds to the first element
|
||||
queueTableRowIdx int // highlighted row in queue table
|
||||
@@ -48,16 +47,8 @@ type State struct {
|
||||
prevView viewType // to support "go back"
|
||||
}
|
||||
|
||||
type redisInfo struct {
|
||||
version string
|
||||
uptime string
|
||||
memoryUsage int
|
||||
peakMemoryUsage int
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
DebugMode bool
|
||||
UseRealData bool
|
||||
PollInterval time.Duration
|
||||
}
|
||||
|
||||
@@ -83,13 +74,12 @@ func Run(opts Options) {
|
||||
done = make(chan struct{})
|
||||
|
||||
// channels to send/receive data fetched asynchronously
|
||||
errorCh = make(chan error)
|
||||
queueCh = make(chan *asynq.QueueInfo)
|
||||
taskCh = make(chan *asynq.TaskInfo)
|
||||
queuesCh = make(chan []*asynq.QueueInfo)
|
||||
groupsCh = make(chan []*asynq.GroupInfo)
|
||||
tasksCh = make(chan []*asynq.TaskInfo)
|
||||
redisInfoCh = make(chan *redisInfo)
|
||||
errorCh = make(chan error)
|
||||
queueCh = make(chan *asynq.QueueInfo)
|
||||
taskCh = make(chan *asynq.TaskInfo)
|
||||
queuesCh = make(chan []*asynq.QueueInfo)
|
||||
groupsCh = make(chan []*asynq.GroupInfo)
|
||||
tasksCh = make(chan []*asynq.TaskInfo)
|
||||
)
|
||||
defer ticker.Stop()
|
||||
|
||||
@@ -103,7 +93,6 @@ func Run(opts Options) {
|
||||
queuesCh,
|
||||
groupsCh,
|
||||
tasksCh,
|
||||
redisInfoCh,
|
||||
}
|
||||
|
||||
d := dashDrawer{
|
||||
@@ -140,26 +129,7 @@ func Run(opts Options) {
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
// TODO: this should be just the call to fetcher.Fetch(state)
|
||||
switch state.view {
|
||||
case viewTypeQueues:
|
||||
go fetchQueues(inspector, queuesCh, errorCh, opts)
|
||||
case viewTypeQueueDetails:
|
||||
go fetchQueueInfo(inspector, state.selectedQueue.Queue, queueCh, errorCh)
|
||||
if shouldShowGroupTable(&state) {
|
||||
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
|
||||
} else if state.taskState == asynq.TaskStateAggregating {
|
||||
go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
} else {
|
||||
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
|
||||
taskPageSize(s), state.pageNum, tasksCh, errorCh)
|
||||
}
|
||||
// if the task modal is open, fetch the selected task's info
|
||||
if state.taskID != "" {
|
||||
go fetchTaskInfo(inspector, state.selectedQueue.Queue, state.taskID, taskCh, errorCh)
|
||||
}
|
||||
}
|
||||
f.Fetch(&state)
|
||||
|
||||
case queues := <-queuesCh:
|
||||
state.queues = queues
|
||||
@@ -195,11 +165,6 @@ func Run(opts Options) {
|
||||
state.err = nil
|
||||
d.Draw(&state)
|
||||
|
||||
case redisInfo := <-redisInfoCh:
|
||||
state.redisInfo = *redisInfo
|
||||
state.err = nil
|
||||
d.Draw(&state)
|
||||
|
||||
case err := <-errorCh:
|
||||
if errors.Is(err, asynq.ErrTaskNotFound) {
|
||||
state.selectedTask = nil
|
||||
|
@@ -5,8 +5,6 @@
|
||||
package dash
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"github.com/gdamore/tcell/v2"
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
@@ -21,13 +19,12 @@ type dataFetcher struct {
|
||||
opts Options
|
||||
s tcell.Screen
|
||||
|
||||
errorCh chan<- error
|
||||
queueCh chan<- *asynq.QueueInfo
|
||||
taskCh chan<- *asynq.TaskInfo
|
||||
queuesCh chan<- []*asynq.QueueInfo
|
||||
groupsCh chan<- []*asynq.GroupInfo
|
||||
tasksCh chan<- []*asynq.TaskInfo
|
||||
redisInfoCh chan<- *redisInfo
|
||||
errorCh chan<- error
|
||||
queueCh chan<- *asynq.QueueInfo
|
||||
taskCh chan<- *asynq.TaskInfo
|
||||
queuesCh chan<- []*asynq.QueueInfo
|
||||
groupsCh chan<- []*asynq.GroupInfo
|
||||
tasksCh chan<- []*asynq.TaskInfo
|
||||
}
|
||||
|
||||
func (f *dataFetcher) Fetch(state *State) {
|
||||
@@ -60,15 +57,6 @@ func (f *dataFetcher) fetchQueues() {
|
||||
}
|
||||
|
||||
func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) {
|
||||
if !opts.UseRealData {
|
||||
n := rand.Intn(100)
|
||||
queuesCh <- []*asynq.QueueInfo{
|
||||
{Queue: "default", Size: 1800 + n, Pending: 700 + n, Active: 300, Aggregating: 300, Scheduled: 200, Retry: 100, Archived: 200},
|
||||
{Queue: "critical", Size: 2300 + n, Pending: 1000 + n, Active: 500, Retry: 400, Completed: 400},
|
||||
{Queue: "low", Size: 900 + n, Pending: n, Active: 300, Scheduled: 400, Completed: 200},
|
||||
}
|
||||
return
|
||||
}
|
||||
queues, err := i.Queues()
|
||||
if err != nil {
|
||||
errorCh <- err
|
||||
@@ -95,20 +83,6 @@ func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.Queu
|
||||
queueCh <- q
|
||||
}
|
||||
|
||||
func (f *dataFetcher) fetchRedisInfo() {
|
||||
go fetchRedisInfo(f.redisInfoCh, f.errorCh)
|
||||
}
|
||||
|
||||
func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
|
||||
n := rand.Intn(1000)
|
||||
redisInfoCh <- &redisInfo{
|
||||
version: "6.2.6",
|
||||
uptime: "9 days",
|
||||
memoryUsage: n,
|
||||
peakMemoryUsage: n + 123,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *dataFetcher) fetchGroups(qname string) {
|
||||
var (
|
||||
i = f.inspector
|
||||
|
@@ -173,7 +173,7 @@ func TestKeyEventHandler(t *testing.T) {
|
||||
for _, e := range tc.events {
|
||||
h.HandleKeyEvent(e)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantState, *tc.state, cmp.AllowUnexported(State{}, redisInfo{})); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantState, *tc.state, cmp.AllowUnexported(State{})); diff != "" {
|
||||
t.Errorf("after state was %+v, want %+v: (-want,+got)\n%s", *tc.state, tc.wantState, diff)
|
||||
}
|
||||
})
|
||||
|
Reference in New Issue
Block a user