2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-03 05:12:01 +08:00

(cli): Define dataFetcher type

This commit is contained in:
Ken Hibino
2022-05-21 16:47:46 -07:00
parent 0eecef2da6
commit 2d5ca43424
3 changed files with 133 additions and 103 deletions

View File

@@ -99,19 +99,24 @@ func Run(opts Options) {
ticker := time.NewTicker(opts.PollInterval)
defer ticker.Stop()
f := dataFetcher{
ticker,
inspector,
opts,
errorCh,
queueCh,
queuesCh,
groupsCh,
tasksCh,
redisInfoCh,
}
h := keyEventHandler{
s: s,
state: &state,
opts: opts,
done: done,
ticker: ticker,
inspector: inspector,
errorCh: errorCh,
queueCh: queueCh,
queuesCh: queuesCh,
groupsCh: groupsCh,
tasksCh: tasksCh,
redisInfoCh: redisInfoCh,
s: s,
fetcher: &f,
state: &state,
opts: opts,
done: done,
}
// TODO: Double check that we are not leaking goroutine with this one.

View File

@@ -6,10 +6,35 @@ package dash
import (
"math/rand"
"time"
"github.com/hibiken/asynq"
)
type dataFetcher struct {
ticker *time.Ticker
inspector *asynq.Inspector
opts Options
errorCh chan<- error
queueCh chan<- *asynq.QueueInfo
queuesCh chan<- []*asynq.QueueInfo
groupsCh chan<- []*asynq.GroupInfo
tasksCh chan<- []*asynq.TaskInfo
redisInfoCh chan<- *redisInfo
}
func (f *dataFetcher) fetchQueues() {
var (
inspector = f.inspector
queuesCh = f.queuesCh
errorCh = f.errorCh
opts = f.opts
)
go fetchQueues(inspector, queuesCh, errorCh, opts)
f.ticker.Reset(opts.PollInterval)
}
func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh chan<- error, opts Options) {
if !opts.UseRealData {
n := rand.Intn(100)
@@ -37,6 +62,18 @@ func fetchQueues(i *asynq.Inspector, queuesCh chan<- []*asynq.QueueInfo, errorCh
queuesCh <- res
}
func (f *dataFetcher) fetchQueueInfo(qname string) {
var (
inspector = f.inspector
queueCh = f.queueCh
errorCh = f.errorCh
opts = f.opts
ticker = f.ticker
)
go fetchQueueInfo(inspector, qname, queueCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.QueueInfo, errorCh chan<- error) {
q, err := i.GetQueueInfo(qname)
if err != nil {
@@ -46,6 +83,11 @@ func fetchQueueInfo(i *asynq.Inspector, qname string, queueCh chan<- *asynq.Queu
queueCh <- q
}
func (f *dataFetcher) fetchRedisInfo() {
go fetchRedisInfo(f.redisInfoCh, f.errorCh)
f.ticker.Reset(f.opts.PollInterval)
}
func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
n := rand.Intn(1000)
redisInfoCh <- &redisInfo{
@@ -56,6 +98,13 @@ func fetchRedisInfo(redisInfoCh chan<- *redisInfo, errorCh chan<- error) {
}
}
func (f *dataFetcher) fetchGroups(qname string) {
i, groupsCh, errorCh := f.inspector, f.groupsCh, f.errorCh
ticker, opts := f.ticker, f.opts
go fetchGroups(i, qname, groupsCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.GroupInfo, errorCh chan<- error) {
groups, err := i.Groups(qname)
if err != nil {
@@ -65,6 +114,18 @@ func fetchGroups(i *asynq.Inspector, qname string, groupsCh chan<- []*asynq.Grou
groupsCh <- groups
}
func (f *dataFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) {
var (
i = f.inspector
tasksCh = f.tasksCh
errorCh = f.errorCh
ticker = f.ticker
opts = f.opts
)
go fetchAggregatingTasks(i, qname, group, pageSize, pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pageNum int,
tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
tasks, err := i.ListAggregatingTasks(qname, group, asynq.PageSize(pageSize), asynq.Page(pageNum))
@@ -75,6 +136,18 @@ func fetchAggregatingTasks(i *asynq.Inspector, qname, group string, pageSize, pa
tasksCh <- tasks
}
func (f *dataFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) {
var (
i = f.inspector
tasksCh = f.tasksCh
errorCh = f.errorCh
ticker = f.ticker
opts = f.opts
)
go fetchTasks(i, qname, taskState, pageSize, pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
}
func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pageSize, pageNum int,
tasksCh chan<- []*asynq.TaskInfo, errorCh chan<- error) {
var (

View File

@@ -6,7 +6,6 @@ package dash
import (
"os"
"time"
"github.com/gdamore/tcell/v2"
"github.com/hibiken/asynq"
@@ -18,15 +17,7 @@ type keyEventHandler struct {
opts Options
done chan struct{}
ticker *time.Ticker
inspector *asynq.Inspector
errorCh chan error
queueCh chan *asynq.QueueInfo
queuesCh chan []*asynq.QueueInfo
groupsCh chan []*asynq.GroupInfo
tasksCh chan []*asynq.TaskInfo
redisInfoCh chan *redisInfo
fetcher *dataFetcher
}
func (h *keyEventHandler) quit() {
@@ -170,13 +161,10 @@ func (h *keyEventHandler) handleEnterKey() {
func (h *keyEventHandler) enterKeyQueues() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.queueTableRowIdx != 0 {
state.selectedQueue = state.queues[state.queueTableRowIdx-1]
@@ -184,44 +172,33 @@ func (h *keyEventHandler) enterKeyQueues() {
state.taskState = asynq.TaskStateActive
state.tasks = nil
state.pageNum = 1
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
taskPageSize(s), state.pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
drawDash(s, state, opts)
}
}
func (h *keyEventHandler) enterKeyQueueDetails() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if shouldShowGroupTable(state) && state.groupTableRowIdx != 0 {
state.selectedGroup = state.groups[state.groupTableRowIdx-1]
state.tasks = nil
state.pageNum = 1
go fetchAggregatingTasks(inspector, state.selectedQueue.Queue, state.selectedGroup.Group,
taskPageSize(s), state.pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
f.fetchAggregatingTasks(state.selectedQueue.Queue, state.selectedGroup.Group, taskPageSize(s), state.pageNum)
drawDash(s, state, opts)
}
}
func (h *keyEventHandler) handleLeftKey() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
groupsCh = h.groupsCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view == viewTypeQueueDetails {
state.taskState = prevTaskState(state.taskState)
@@ -230,26 +207,20 @@ func (h *keyEventHandler) handleLeftKey() {
state.tasks = nil
state.selectedGroup = nil
if shouldShowGroupTable(state) {
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
f.fetchGroups(state.selectedQueue.Queue)
} else {
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
taskPageSize(s), state.pageNum, tasksCh, errorCh)
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
ticker.Reset(opts.PollInterval)
drawDash(s, state, opts)
}
}
func (h *keyEventHandler) handleRightKey() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
groupsCh = h.groupsCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view == viewTypeQueueDetails {
state.taskState = nextTaskState(state.taskState)
@@ -258,25 +229,20 @@ func (h *keyEventHandler) handleRightKey() {
state.tasks = nil
state.selectedGroup = nil
if shouldShowGroupTable(state) {
go fetchGroups(inspector, state.selectedQueue.Queue, groupsCh, errorCh)
f.fetchGroups(state.selectedQueue.Queue)
} else {
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
taskPageSize(s), state.pageNum, tasksCh, errorCh)
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
ticker.Reset(opts.PollInterval)
drawDash(s, state, opts)
}
}
func (h *keyEventHandler) nextPage() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view == viewTypeQueueDetails {
if shouldShowGroupTable(state) {
@@ -293,9 +259,7 @@ func (h *keyEventHandler) nextPage() {
totalCount := getTaskCount(state.selectedQueue, state.taskState)
if (state.pageNum-1)*pageSize+len(state.tasks) < totalCount {
state.pageNum++
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
pageSize, state.pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
f.fetchTasks(state.selectedQueue.Queue, state.taskState, pageSize, state.pageNum)
}
}
}
@@ -303,13 +267,10 @@ func (h *keyEventHandler) nextPage() {
func (h *keyEventHandler) prevPage() {
var (
s = h.s
state = h.state
opts = h.opts
inspector = h.inspector
ticker = h.ticker
errorCh = h.errorCh
tasksCh = h.tasksCh
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view == viewTypeQueueDetails {
if shouldShowGroupTable(state) {
@@ -322,9 +283,7 @@ func (h *keyEventHandler) prevPage() {
} else {
if state.pageNum > 1 {
state.pageNum--
go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState,
taskPageSize(s), state.pageNum, tasksCh, errorCh)
ticker.Reset(opts.PollInterval)
f.fetchTasks(state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum)
}
}
}
@@ -332,17 +291,13 @@ func (h *keyEventHandler) prevPage() {
func (h *keyEventHandler) showQueues() {
var (
s = h.s
state = h.state
inspector = h.inspector
queuesCh = h.queuesCh
errorCh = h.errorCh
opts = h.opts
ticker = h.ticker
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view != viewTypeQueues {
go fetchQueues(inspector, queuesCh, errorCh, opts)
ticker.Reset(opts.PollInterval)
f.fetchQueues()
state.view = viewTypeQueues
drawDash(s, state, opts)
}
@@ -376,16 +331,13 @@ func (h *keyEventHandler) showSchedulers() {
func (h *keyEventHandler) showRedisInfo() {
var (
s = h.s
state = h.state
opts = h.opts
redisInfoCh = h.redisInfoCh
errorCh = h.errorCh
ticker = h.ticker
s = h.s
state = h.state
opts = h.opts
f = h.fetcher
)
if state.view != viewTypeRedis {
go fetchRedisInfo(redisInfoCh, errorCh)
ticker.Reset(opts.PollInterval)
f.fetchRedisInfo()
state.view = viewTypeRedis
drawDash(s, state, opts)
}