diff --git a/tools/asynq/cmd/dash/dash.go b/tools/asynq/cmd/dash/dash.go index 27a45cf..5d16660 100644 --- a/tools/asynq/cmd/dash/dash.go +++ b/tools/asynq/cmd/dash/dash.go @@ -5,6 +5,7 @@ package dash import ( + "errors" "fmt" "os" "time" @@ -90,6 +91,7 @@ func Run(opts Options) { // channels to send/receive data fetched asynchronously errorCh = make(chan error) queueCh = make(chan *asynq.QueueInfo) + taskCh = make(chan *asynq.TaskInfo) queuesCh = make(chan []*asynq.QueueInfo) groupsCh = make(chan []*asynq.GroupInfo) tasksCh = make(chan []*asynq.TaskInfo) @@ -102,6 +104,7 @@ func Run(opts Options) { opts, errorCh, queueCh, + taskCh, queuesCh, groupsCh, tasksCh, @@ -156,6 +159,11 @@ func Run(opts Options) { go fetchTasks(inspector, state.selectedQueue.Queue, state.taskState, taskPageSize(s), state.pageNum, tasksCh, errorCh) } + // if the task modal is open, fetch the selected task's info + if state.taskID != "" { + go fetchTaskInfo(inspector, state.selectedQueue.Queue, state.taskID, taskCh, errorCh) + } + case viewTypeRedis: go fetchRedisInfo(redisInfoCh, errorCh) } @@ -189,13 +197,22 @@ func Run(opts Options) { } d.draw(&state) + case t := <-taskCh: + state.selectedTask = t + state.err = nil + d.draw(&state) + case redisInfo := <-redisInfoCh: state.redisInfo = *redisInfo state.err = nil d.draw(&state) case err := <-errorCh: - state.err = err + if errors.Is(err, asynq.ErrTaskNotFound) { + state.selectedTask = nil + } else { + state.err = err + } d.draw(&state) } } diff --git a/tools/asynq/cmd/dash/draw.go b/tools/asynq/cmd/dash/draw.go index 4868a61..2ca506a 100644 --- a/tools/asynq/cmd/dash/draw.go +++ b/tools/asynq/cmd/dash/draw.go @@ -10,6 +10,8 @@ import ( "strconv" "strings" "time" + "unicode" + "unicode/utf8" "github.com/gdamore/tcell/v2" "github.com/hibiken/asynq" @@ -445,16 +447,74 @@ func drawTaskModal(d *ScreenDrawer, state *State) { if state.taskID == "" { return } + task := state.selectedTask + if task == nil { + // task no longer found + contents := []*rowContent{ + {" === Task Summary ===", baseStyle.Bold(true)}, + {"", baseStyle}, + {fmt.Sprintf(" Task %q no longer exists", state.taskID), baseStyle}, + } + withModal(d, contents) + return + } contents := []*rowContent{ {" === Task Summary ===", baseStyle.Bold(true)}, {"", baseStyle}, - {" ID: xxxxx", baseStyle}, - {" Type: xxxxx", baseStyle}, - {" State: xxxx", baseStyle}, + {fmt.Sprintf(" ID: %s", task.ID), baseStyle}, + {fmt.Sprintf(" Type: %s", task.Type), baseStyle}, + {fmt.Sprintf(" State: %s", task.State.String()), baseStyle}, + {fmt.Sprintf(" Queue: %s", task.Queue), baseStyle}, + {fmt.Sprintf(" Retried: %d/%d", task.Retried, task.MaxRetry), baseStyle}, + } + if task.LastErr != "" { + contents = append(contents, &rowContent{ + fmt.Sprintf(" Last Failure: %s", task.LastErr), + baseStyle, + }) + contents = append(contents, &rowContent{ + fmt.Sprintf(" Last Failure Time: %v", task.LastFailedAt), + baseStyle, + }) + } + if !task.NextProcessAt.IsZero() { + contents = append(contents, &rowContent{ + fmt.Sprintf(" Next Process Time: %v", task.NextProcessAt), + baseStyle, + }) + } + if !task.CompletedAt.IsZero() { + contents = append(contents, &rowContent{ + fmt.Sprintf(" Completion Time: %v", task.CompletedAt), + baseStyle, + }) + } + if isPrintable(task.Payload) { + contents = append(contents, &rowContent{ + fmt.Sprintf("Payload: %s", string(task.Payload)), + baseStyle, + }) } withModal(d, contents) } +// Reports whether the given byte slice is printable (i.e. human readable) +func isPrintable(data []byte) bool { + if !utf8.Valid(data) { + return false + } + isAllSpace := true + for _, r := range string(data) { + if !unicode.IsGraphic(r) { + return false + } + if !unicode.IsSpace(r) { + isAllSpace = false + } + } + return !isAllSpace +} + type rowContent struct { s string // should not include newline style tcell.Style diff --git a/tools/asynq/cmd/dash/fetch.go b/tools/asynq/cmd/dash/fetch.go index ac42f0e..a70246b 100644 --- a/tools/asynq/cmd/dash/fetch.go +++ b/tools/asynq/cmd/dash/fetch.go @@ -14,6 +14,7 @@ type fetcher interface { fetchQueues() fetchQueueInfo(qname string) fetchRedisInfo() + fetchTaskInfo(qname, taskID string) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) fetchGroups(qname string) @@ -25,6 +26,7 @@ type dataFetcher struct { errorCh chan<- error queueCh chan<- *asynq.QueueInfo + taskCh chan<- *asynq.TaskInfo queuesCh chan<- []*asynq.QueueInfo groupsCh chan<- []*asynq.GroupInfo tasksCh chan<- []*asynq.TaskInfo @@ -169,3 +171,21 @@ func fetchTasks(i *asynq.Inspector, qname string, taskState asynq.TaskState, pag } tasksCh <- tasks } + +func (f *dataFetcher) fetchTaskInfo(qname, taskID string) { + var ( + i = f.inspector + taskCh = f.taskCh + errorCh = f.errorCh + ) + go fetchTaskInfo(i, qname, taskID, taskCh, errorCh) +} + +func fetchTaskInfo(i *asynq.Inspector, qname, taskID string, taskCh chan<- *asynq.TaskInfo, errorCh chan<- error) { + info, err := i.GetTaskInfo(qname, taskID) + if err != nil { + errorCh <- err + return + } + taskCh <- info +} diff --git a/tools/asynq/cmd/dash/key_event.go b/tools/asynq/cmd/dash/key_event.go index c265327..611eff1 100644 --- a/tools/asynq/cmd/dash/key_event.go +++ b/tools/asynq/cmd/dash/key_event.go @@ -210,9 +210,10 @@ func (h *keyEventHandler) enterKeyQueueDetails() { d.draw(state) } else if !shouldShowGroupTable(state) && state.taskTableRowIdx != 0 { task := state.tasks[state.taskTableRowIdx-1] - state.taskID = task.ID state.selectedTask = task - // TODO: go fetch task info + state.taskID = task.ID + f.fetchTaskInfo(state.selectedQueue.Queue, task.ID) + h.resetTicker() d.draw(state) } diff --git a/tools/asynq/cmd/dash/key_event_test.go b/tools/asynq/cmd/dash/key_event_test.go index f784175..315cdcb 100644 --- a/tools/asynq/cmd/dash/key_event_test.go +++ b/tools/asynq/cmd/dash/key_event_test.go @@ -183,6 +183,7 @@ type fakeFetcher struct{} func (f *fakeFetcher) fetchQueues() {} func (f *fakeFetcher) fetchQueueInfo(qname string) {} func (f *fakeFetcher) fetchRedisInfo() {} +func (f *fakeFetcher) fetchTaskInfo(qname, taskID string) {} func (f *fakeFetcher) fetchTasks(qname string, taskState asynq.TaskState, pageSize, pageNum int) {} func (f *fakeFetcher) fetchAggregatingTasks(qname, group string, pageSize, pageNum int) {} func (f *fakeFetcher) fetchGroups(qname string) {}