diff --git a/conversion_helpers.go b/conversion_helpers.go index 905559d..42ecd66 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -89,6 +89,13 @@ type BaseTask struct { type ActiveTask struct { *BaseTask + + // Started time indicates when a worker started working on ths task. + // + // Value is either time formatted in RFC3339 format, or "-" which indicates + // a worker started working on the task only a few moments ago, and started time + // data is not available. + Started string `json:"start_time"` } func toActiveTask(t *asynq.ActiveTask) *ActiveTask { @@ -98,7 +105,7 @@ func toActiveTask(t *asynq.ActiveTask) *ActiveTask { Payload: t.Payload, Queue: t.Queue, } - return &ActiveTask{base} + return &ActiveTask{BaseTask: base} } func toActiveTasks(in []*asynq.ActiveTask) []*ActiveTask { diff --git a/task_handlers.go b/task_handlers.go index abe614f..18525c4 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -5,6 +5,7 @@ import ( "log" "net/http" "strconv" + "time" "github.com/gorilla/mux" "github.com/hibiken/asynq" @@ -15,11 +16,17 @@ import ( // - http.Handler(s) for task related endpoints // **************************************************************************** +type ListActiveTasksResponse struct { + Tasks []*ActiveTask `json:"tasks"` + Stats *QueueStateSnapshot `json:"stats"` +} + func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) + tasks, err := inspector.ListActiveTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { @@ -31,15 +38,35 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc http.Error(w, err.Error(), http.StatusInternalServerError) return } - payload := make(map[string]interface{}) - if len(tasks) == 0 { - // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*ActiveTask, 0) - } else { - payload["tasks"] = toActiveTasks(tasks) + servers, err := inspector.Servers() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } - payload["stats"] = toQueueStateSnapshot(stats) - if err := json.NewEncoder(w).Encode(payload); err != nil { + // m maps taskID to started time. + m := make(map[string]time.Time) + for _, srv := range servers { + for _, w := range srv.ActiveWorkers { + if w.Task.Queue == qname { + m[w.Task.ID] = w.Started + } + } + } + activeTasks := toActiveTasks(tasks) + for _, t := range activeTasks { + started, ok := m[t.ID] + if ok { + t.Started = started.Format(time.RFC3339) + } else { + t.Started = "-" + } + } + + resp := ListActiveTasksResponse{ + Tasks: activeTasks, + Stats: toQueueStateSnapshot(stats), + } + if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/ui/src/api.ts b/ui/src/api.ts index 8267938..92acc33 100644 --- a/ui/src/api.ts +++ b/ui/src/api.ts @@ -243,6 +243,7 @@ interface BaseTask { export interface ActiveTask extends BaseTask { id: string; queue: string; + start_time: string; } export interface PendingTask extends BaseTask { diff --git a/ui/src/components/ActiveTasksTable.tsx b/ui/src/components/ActiveTasksTable.tsx index be44be0..d671094 100644 --- a/ui/src/components/ActiveTasksTable.tsx +++ b/ui/src/components/ActiveTasksTable.tsx @@ -37,7 +37,7 @@ import TablePaginationActions, { import TableActions from "./TableActions"; import { usePolling } from "../hooks"; import { ActiveTaskExtended } from "../reducers/tasksReducer"; -import { uuidPrefix } from "../utils"; +import { timeAgo, uuidPrefix } from "../utils"; import { TableColumn } from "../types/table"; const useStyles = makeStyles((theme) => ({ @@ -66,6 +66,15 @@ const mapDispatchToProps = { cancelAllActiveTasksAsync, }; +const columns: TableColumn[] = [ + { key: "icon", label: "", align: "left" }, + { key: "id", label: "ID", align: "left" }, + { key: "type", label: "Type", align: "left" }, + { key: "status", label: "Status", align: "left" }, + { key: "start-time", label: "Started", align: "left" }, + { key: "actions", label: "Actions", align: "center" }, +]; + const connector = connect(mapStateToProps, mapDispatchToProps); type ReduxProps = ConnectedProps; @@ -131,14 +140,6 @@ function ActiveTasksTable(props: Props & ReduxProps) { ); } - const columns: TableColumn[] = [ - { key: "icon", label: "", align: "left" }, - { key: "id", label: "ID", align: "left" }, - { key: "type", label: "Type", align: "left" }, - { key: "status", label: "Status", align: "left" }, - { key: "actions", label: "Actions", align: "center" }, - ]; - const rowCount = props.tasks.length; const numSelected = selectedIds.length; return ( @@ -294,6 +295,9 @@ function Row(props: RowProps) { {task.type} {task.canceling ? "Canceling" : "Running"} + + {task.start_time === "-" ? "just now" : timeAgo(task.start_time)} + - +