2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-03 05:12:01 +08:00

(cli): extract ticker out of fetcher

This commit is contained in:
Ken Hibino
2022-05-23 15:40:07 -07:00
parent 9415d66655
commit b6dac5c0a1
3 changed files with 33 additions and 21 deletions

View File

@@ -33,6 +33,7 @@ type State struct {
redisInfo redisInfo
err error
// Note: index zero corresponds to the table header; index=1 correctponds to the first element
queueTableRowIdx int // highlighted row in queue table
taskTableRowIdx int // highlighted row in task table
groupTableRowIdx int // highlighted row in group table
@@ -97,7 +98,6 @@ func Run(opts Options) {
defer ticker.Stop()
f := dataFetcher{
ticker,
inspector,
opts,
errorCh,
@@ -114,11 +114,13 @@ func Run(opts Options) {
}
h := keyEventHandler{
s: s,
fetcher: &f,
drawer: &d,
state: &state,
done: done,
s: s,
fetcher: &f,
drawer: &d,
state: &state,
done: done,
ticker: ticker,
pollInterval: opts.PollInterval,
}
go fetchQueues(inspector, queuesCh, errorCh, opts)
@@ -161,6 +163,9 @@ func Run(opts Options) {
case queues := <-queuesCh:
state.queues = queues
state.err = nil
if len(queues) < state.queueTableRowIdx {
state.queueTableRowIdx = len(queues)
}
d.draw(&state)
case q := <-queueCh:
@@ -171,11 +176,17 @@ func Run(opts Options) {
case groups := <-groupsCh:
state.groups = groups
state.err = nil
if len(groups) < state.groupTableRowIdx {
state.groupTableRowIdx = len(groups)
}
d.draw(&state)
case tasks := <-tasksCh:
state.tasks = tasks
state.err = nil
if len(tasks) < state.taskTableRowIdx {
state.taskTableRowIdx = len(tasks)
}
d.draw(&state)
case redisInfo := <-redisInfoCh:

View File

@@ -6,7 +6,6 @@ package dash
import (
"math/rand"
"time"
"github.com/hibiken/asynq"
)
@@ -21,7 +20,6 @@ type fetcher interface {
}
type dataFetcher struct {
ticker *time.Ticker
inspector *asynq.Inspector
opts Options
@@ -41,7 +39,6 @@ func (f *dataFetcher) fetchQueues() {
opts = f.opts
)
go fetchQueues(inspector, queuesCh, errorCh, opts)
f.ticker.Reset(opts.PollInterval)
}
func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) {
@@ -76,11 +73,8 @@ func (f *dataFetcher) fetchQueueInfo(qname string) {
inspector = f.inspector
queueCh = f.queueCh
errorCh = f.errorCh
opts = f.opts
ticker = f.ticker
)
go fetchQueueInfo(inspector, qname, queueCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) {
@@ -94,7 +88,6 @@ func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.Queu
func (f *dataFetcher) fetchRedisInfo() {
go fetchRedisInfo(f.redisInfoCh, f.errorCh)
f.ticker.Reset(f.opts.PollInterval)
}
func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
@@ -109,9 +102,7 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
func (f *dataFetcher) fetchGroups(qname string) {
i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh
ticker, opts := f.ticker, f.opts
go fetchGroups(i, qname, groupsCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) {
@@ -128,11 +119,8 @@ func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageN
i = f.inspector
tasksCh = f.tasksCh
errorCh = f.errorCh
ticker = f.ticker
opts = f.opts
)
go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int,
@@ -150,11 +138,8 @@ func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSi
i = f.inspector
tasksCh = f.tasksCh
errorCh = f.errorCh
ticker = f.ticker
opts = f.opts
)
go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int,

View File

@@ -6,6 +6,7 @@ package dash
import (
"os"
"time"
"github.com/gdamore/tcell/v2"
"github.com/hibiken/asynq"
@@ -20,6 +21,9 @@ type keyEventHandler struct {
fetcher fetcher
drawer drawer
ticker *time.Ticker
pollInterval time.Duration
}
func (h *keyEventHandler) quit() {
@@ -167,6 +171,10 @@ func (h *keyEventHandler) handleEnterKey() {
}
}
func (h *keyEventHandler) resetTicker() {
h.ticker.Reset(h.pollInterval)
}
func (h *keyEventHandler) enterKeyQueues() {
var (
s = h.s
@@ -181,6 +189,7 @@ func (h *keyEventHandler) enterKeyQueues() {
state.tasks = nil
state.pageNum = 1
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
h.resetTicker()
d.draw(state)
}
}
@@ -197,6 +206,7 @@ func (h *keyEventHandler) enterKeyQueueDetails() {
state.tasks = nil
state.pageNum = 1
f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum)
h.resetTicker()
d.draw(state)
} else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 {
task := state.tasks[state.taskTableRowIdx-1]
@@ -226,6 +236,7 @@ func (h *keyEventHandler) handleLeftKey() {
} else {
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
h.resetTicker()
d.draw(state)
}
}
@@ -248,6 +259,7 @@ func (h *keyEventHandler) handleRightKey() {
} else {
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
h.resetTicker()
d.draw(state)
}
}
@@ -275,6 +287,7 @@ func (h *keyEventHandler) nextPage() {
if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount {
state.pageNum++
f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum)
h.resetTicker()
}
}
}
@@ -299,6 +312,7 @@ func (h *keyEventHandler) prevPage() {
if state.pageNum > 1 {
state.pageNum--
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
h.resetTicker()
}
}
}
@@ -312,6 +326,7 @@ func (h *keyEventHandler) showQueues() {
)
if state.view != viewTypeQueues {
f.fetchQueues()
h.resetTicker()
state.view = viewTypeQueues
d.draw(state)
}
@@ -349,6 +364,7 @@ func (h *keyEventHandler) showRedisInfo() {
)
if state.view != viewTypeRedis {
f.fetchRedisInfo()
h.resetTicker()
state.view = viewTypeRedis
d.draw(state)
}