From b6dac5c0a1df785c32ec17dd897157c1d5e6671e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 23 May 2022 15:40:07 -0700 Subject: [PATCH] (cli): extract ticker out of fetcher --- tools/asynq/cmd/dash/dash.go | 23 +++++++++++++++++------ tools/asynq/cmd/dash/fetch.go | 15 --------------- tools/asynq/cmd/dash/key_event.go | 16 ++++++++++++++++ 3 files changed, 33 insertions(+), 21 deletions(-) diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 233fbb3..27a45cf 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -33,6 +33,7 @@ type State struct { redisInfo redisInfo err error + // Note: index zero corresponds to the table header; index=1 correctponds to the first element queueTableRowIdx int // highlighted row in queue table taskTableRowIdx int // highlighted row in task table groupTableRowIdx int // highlighted row in group table @@ -97,7 +98,6 @@ func Run(opts Options) { defer ticker.Stop() f := dataFetcher{ - ticker, inspector, opts, errorCh, @@ -114,11 +114,13 @@ func Run(opts Options) { } h := keyEventHandler{ - s: s, - fetcher: &f, - drawer: &d, - state: &state, - done: done, + s: s, + fetcher: &f, + drawer: &d, + state: &state, + done: done, + ticker: ticker, + pollInterval: opts.PollInterval, } go fetchQueues(inspector, queuesCh, errorCh, opts) @@ -161,6 +163,9 @@ func Run(opts Options) { case queues := <-queuesCh: state.queues = queues state.err = nil + if len(queues) < state.queueTableRowIdx { + state.queueTableRowIdx = len(queues) + } d.draw(&state) case q := <-queueCh: @@ -171,11 +176,17 @@ func Run(opts Options) { case groups := <-groupsCh: state.groups = groups state.err = nil + if len(groups) < state.groupTableRowIdx { + state.groupTableRowIdx = len(groups) + } d.draw(&state) case tasks := <-tasksCh: state.tasks = tasks state.err = nil + if len(tasks) < state.taskTableRowIdx { + state.taskTableRowIdx = len(tasks) + } d.draw(&state) case redisInfo := <-redisInfoCh: diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index f148ed9..ac42f0e 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -6,7 +6,6 @@ package dash import ( "math/rand" - "time" "github.com/hibiken/asynq" ) @@ -21,7 +20,6 @@ type fetcher interface { } type dataFetcher struct { - ticker *time.Ticker inspector *asynq.Inspector opts Options @@ -41,7 +39,6 @@ func (f *dataFetcher) fetchQueues() { opts = f.opts ) go fetchQueues(inspector, queuesCh, errorCh, opts) - f.ticker.Reset(opts.PollInterval) } func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) { @@ -76,11 +73,8 @@ func (f *dataFetcher) fetchQueueInfo(qname string) { inspector = f.inspector queueCh = f.queueCh errorCh = f.errorCh - opts = f.opts - ticker = f.ticker ) go fetchQueueInfo(inspector, qname, queueCh, errorCh) - ticker.Reset(opts.PollInterval) } func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) { @@ -94,7 +88,6 @@ func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.Queu func (f *dataFetcher) fetchRedisInfo() { go fetchRedisInfo(f.redisInfoCh, f.errorCh) - f.ticker.Reset(f.opts.PollInterval) } func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { @@ -109,9 +102,7 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) { func (f *dataFetcher) fetchGroups(qname string) { i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh - ticker, opts := f.ticker, f.opts go fetchGroups(i, qname, groupsCh, errorCh) - ticker.Reset(opts.PollInterval) } func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) { @@ -128,11 +119,8 @@ func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageN i = f.inspector tasksCh = f.tasksCh errorCh = f.errorCh - ticker = f.ticker - opts = f.opts ) go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) } func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int, @@ -150,11 +138,8 @@ func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSi i = f.inspector tasksCh = f.tasksCh errorCh = f.errorCh - ticker = f.ticker - opts = f.opts ) go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh) - ticker.Reset(opts.PollInterval) } func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int, diff --git a/tools/asynq/cmd/dash/key_event.go b/tools/asynq/cmd/dash/key_event.go index 04d8a18..c265327 100644 --- a/tools/asynq/cmd/dash/key_event.go +++ b/tools/asynq/cmd/dash/key_event.go @@ -6,6 +6,7 @@ package dash import ( "os" + "time" "github.com/gdamore/tcell/v2" "github.com/hibiken/asynq" @@ -20,6 +21,9 @@ type keyEventHandler struct { fetcher fetcher drawer drawer + + ticker *time.Ticker + pollInterval time.Duration } func (h *keyEventHandler) quit() { @@ -167,6 +171,10 @@ func (h *keyEventHandler) handleEnterKey() { } } +func (h *keyEventHandler) resetTicker() { + h.ticker.Reset(h.pollInterval) +} + func (h *keyEventHandler) enterKeyQueues() { var ( s = h.s @@ -181,6 +189,7 @@ func (h *keyEventHandler) enterKeyQueues() { state.tasks = nil state.pageNum = 1 f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) + h.resetTicker() d.draw(state) } } @@ -197,6 +206,7 @@ func (h *keyEventHandler) enterKeyQueueDetails() { state.tasks = nil state.pageNum = 1 f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum) + h.resetTicker() d.draw(state) } else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 { task := state.tasks[state.taskTableRowIdx-1] @@ -226,6 +236,7 @@ func (h *keyEventHandler) handleLeftKey() { } else { f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) } + h.resetTicker() d.draw(state) } } @@ -248,6 +259,7 @@ func (h *keyEventHandler) handleRightKey() { } else { f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) } + h.resetTicker() d.draw(state) } } @@ -275,6 +287,7 @@ func (h *keyEventHandler) nextPage() { if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount { state.pageNum++ f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum) + h.resetTicker() } } } @@ -299,6 +312,7 @@ func (h *keyEventHandler) prevPage() { if state.pageNum > 1 { state.pageNum-- f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum) + h.resetTicker() } } } @@ -312,6 +326,7 @@ func (h *keyEventHandler) showQueues() { ) if state.view != viewTypeQueues { f.fetchQueues() + h.resetTicker() state.view = viewTypeQueues d.draw(state) } @@ -349,6 +364,7 @@ func (h *keyEventHandler) showRedisInfo() { ) if state.view != viewTypeRedis { f.fetchRedisInfo() + h.resetTicker() state.view = viewTypeRedis d.draw(state) }