diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 8fba112..8d6ea76 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -99,19 +99,24 @@ func Run(opts Options) { ticker := time.NewTicker(opts.PollInterval) defer ticker.Stop() + f := dataFetcher{ + ticker, + inspector, + opts, + errorCh, + queueCh, + queuesCh, + groupsCh, + tasksCh, + redisInfoCh, + } + h := keyEventHandler{ - s: s, - state: &state, - opts: opts, - done: done, - ticker: ticker, - inspector: inspector, - errorCh: errorCh, - queueCh: queueCh, - queuesCh: queuesCh, - groupsCh: groupsCh, - tasksCh: tasksCh, - redisInfoCh: redisInfoCh, + s: s, + fetcher: &f, + state: &state, + opts: opts, + done: done, } // TODO: Double check that we are not leaking goroutine with this one. diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index 8be12c6..ddb9df9 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -6,10 +6,35 @@ package dash import ( "math/rand" + "time" "github.com/hibiken/asynq" ) +type dataFetcher struct { + ticker *time.Ticker + inspector *asynq.Inspector + opts Options + + errorCh chan<- error + queueCh chan<- *asynq.QueueInfo + queuesCh chan<- []*asynq.QueueInfo + groupsCh chan<- []*asynq.GroupInfo + tasksCh chan<- []*asynq.TaskInfo + redisInfoCh chan<- *redisInfo +} + +func (f *dataFetcher) fetchQueues() { + var ( + inspector = f.inspector + queuesCh = f.queuesCh + errorCh = f.errorCh + opts = f.opts + ) + go fetchQueues(inspector, queuesCh, errorCh, opts) + f.ticker.Reset(opts.PollInterval) +} + func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) { if !opts.UseRealData { n := rand.Intn(100) @@ -37,6 +62,18 @@ func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh queuesCh <- res } +func (f *dataFetcher) fetchQueueInfo(qname string) { + var ( + inspector = f.inspector + queueCh = f.queueCh + errorCh = f.errorCh + opts = f.opts + ticker = f.ticker + ) + go fetchQueueInfo(inspector, qname, queueCh, errorCh) + ticker.Reset(opts.PollInterval) +} + func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) { q, err := i.GetQueueInfo(qname) if err != nil { @@ -46,6 +83,11 @@ func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.Queu queueCh <- q } +func (f *dataFetcher) fetchRedisInfo() { + go fetchRedisInfo(f.redisInfoCh, f.errorCh) + f.ticker.Reset(f.opts.PollInterval) +} + func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { n := rand.Intn(1000) redisInfoCh <- &redisInfo{ @@ -56,6 +98,13 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { } } +func (f *dataFetcher) fetchGroups(qname string) { + i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh + ticker, opts := f.ticker, f.opts + go fetchGroups(i, qname, groupsCh, errorCh) + ticker.Reset(opts.PollInterval) +} + func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) { groups, err := i.Groups(qname) if err != nil { @@ -65,6 +114,18 @@ func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.Grou groupsCh <- groups } +func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) { + var ( + i = f.inspector + tasksCh = f.tasksCh + errorCh = f.errorCh + ticker = f.ticker + opts = f.opts + ) + go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh) + ticker.Reset(opts.PollInterval) +} + func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum)) @@ -75,6 +136,18 @@ func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pa tasksCh <- tasks } +func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) { + var ( + i = f.inspector + tasksCh = f.tasksCh + errorCh = f.errorCh + ticker = f.ticker + opts = f.opts + ) + go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh) + ticker.Reset(opts.PollInterval) +} + func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) { var ( diff --git a/tools/asynq/cmd/dash/key_event.go b/tools/asynq/cmd/dash/key_event.go index 41bc092..7e44f3a 100644 --- a/tools/asynq/cmd/dash/key_event.go +++ b/tools/asynq/cmd/dash/key_event.go @@ -6,7 +6,6 @@ package dash import ( "os" - "time" "github.com/gdamore/tcell/v2" "github.com/hibiken/asynq" @@ -18,15 +17,7 @@ type keyEventHandler struct { opts Options done chan struct{} - ticker *time.Ticker - inspector *asynq.Inspector - - errorCh chan error - queueCh chan *asynq.QueueInfo - queuesCh chan []*asynq.QueueInfo - groupsCh chan []*asynq.GroupInfo - tasksCh chan []*asynq.TaskInfo - redisInfoCh chan *redisInfo + fetcher *dataFetcher } func (h *keyEventHandler) quit() { @@ -170,13 +161,10 @@ func (h *keyEventHandler) handleEnterKey() { func (h *keyEventHandler) enterKeyQueues() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.queueTableRowIdx != 0 { state.selectedQueue = state.queues[state.queueTableRowIdx-1] @@ -184,44 +172,33 @@ func (h *keyEventHandler) enterKeyQueues() { state.taskState = asynq.TaskStateActive state.tasks = nil state.pageNum = 1 - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) + f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) drawDash(s, state, opts) } } func (h *keyEventHandler) enterKeyQueueDetails() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if shouldShowGroupTable(state) && state.groupTableRowIdx != 0 { state.selectedGroup = state.groups[state.groupTableRowIdx-1] state.tasks = nil state.pageNum = 1 - go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group, - taskPageSize(s), state.pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) + f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum) drawDash(s, state, opts) } } func (h *keyEventHandler) handleLeftKey() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh - groupsCh = h.groupsCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view == viewTypeQueueDetails { state.taskState = prevTaskState(state.taskState) @@ -230,26 +207,20 @@ func (h *keyEventHandler) handleLeftKey() { state.tasks = nil state.selectedGroup = nil if shouldShowGroupTable(state) { - go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh) + f.fetchGroups(state.selectedQueue.Queue) } else { - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) + f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) } - ticker.Reset(opts.PollInterval) drawDash(s, state, opts) } } func (h *keyEventHandler) handleRightKey() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh - groupsCh = h.groupsCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view == viewTypeQueueDetails { state.taskState = nextTaskState(state.taskState) @@ -258,25 +229,20 @@ func (h *keyEventHandler) handleRightKey() { state.tasks = nil state.selectedGroup = nil if shouldShowGroupTable(state) { - go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh) + f.fetchGroups(state.selectedQueue.Queue) } else { - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) + f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) } - ticker.Reset(opts.PollInterval) drawDash(s, state, opts) } } func (h *keyEventHandler) nextPage() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view == viewTypeQueueDetails { if shouldShowGroupTable(state) { @@ -293,9 +259,7 @@ func (h *keyEventHandler) nextPage() { totalCount := getTaskCount(state.selectedQueue, state.taskState) if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { state.pageNum++ - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - pageSize, state.pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) + f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum) } } } @@ -303,13 +267,10 @@ func (h *keyEventHandler) nextPage() { func (h *keyEventHandler) prevPage() { var ( - s = h.s - state = h.state - opts = h.opts - inspector = h.inspector - ticker = h.ticker - errorCh = h.errorCh - tasksCh = h.tasksCh + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view == viewTypeQueueDetails { if shouldShowGroupTable(state) { @@ -322,9 +283,7 @@ func (h *keyEventHandler) prevPage() { } else { if state.pageNum > 1 { state.pageNum-- - go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, - taskPageSize(s), state.pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) + f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) } } } @@ -332,17 +291,13 @@ func (h *keyEventHandler) prevPage() { func (h *keyEventHandler) showQueues() { var ( - s = h.s - state = h.state - inspector = h.inspector - queuesCh = h.queuesCh - errorCh = h.errorCh - opts = h.opts - ticker = h.ticker + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view != viewTypeQueues { - go fetchQueues(inspector, queuesCh, errorCh, opts) - ticker.Reset(opts.PollInterval) + f.fetchQueues() state.view = viewTypeQueues drawDash(s, state, opts) } @@ -376,16 +331,13 @@ func (h *keyEventHandler) showSchedulers() { func (h *keyEventHandler) showRedisInfo() { var ( - s = h.s - state = h.state - opts = h.opts - redisInfoCh = h.redisInfoCh - errorCh = h.errorCh - ticker = h.ticker + s = h.s + state = h.state + opts = h.opts + f = h.fetcher ) if state.view != viewTypeRedis { - go fetchRedisInfo(redisInfoCh, errorCh) - ticker.Reset(opts.PollInterval) + f.fetchRedisInfo() state.view = viewTypeRedis drawDash(s, state, opts) }