mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-18 18:55:54 +08:00
Fetch DailyStats in Dashboard view
This commit is contained in:
parent
3d982d9a8b
commit
d0b6dee896
@ -55,11 +55,11 @@ func toQueueStateSnapshot(s *asynq.QueueStats) *QueueStateSnapshot {
|
||||
}
|
||||
|
||||
type DailyStats struct {
|
||||
Queue string `json:"queue"`
|
||||
Processed int `json:"processed"`
|
||||
Succeeded int `json:"succeeded"`
|
||||
Failed int `json:"failed"`
|
||||
Date time.Time `json:"date"`
|
||||
Queue string `json:"queue"`
|
||||
Processed int `json:"processed"`
|
||||
Succeeded int `json:"succeeded"`
|
||||
Failed int `json:"failed"`
|
||||
Date string `json:"date"`
|
||||
}
|
||||
|
||||
func toDailyStats(s *asynq.DailyStats) *DailyStats {
|
||||
@ -68,7 +68,7 @@ func toDailyStats(s *asynq.DailyStats) *DailyStats {
|
||||
Processed: s.Processed,
|
||||
Succeeded: s.Processed - s.Failed,
|
||||
Failed: s.Failed,
|
||||
Date: s.Date,
|
||||
Date: s.Date.Format("2006-01-02"),
|
||||
}
|
||||
}
|
||||
|
||||
|
45
ui/src/actions/queueStatsActions.ts
Normal file
45
ui/src/actions/queueStatsActions.ts
Normal file
@ -0,0 +1,45 @@
|
||||
import { Dispatch } from "redux";
|
||||
import { listQueueStats, ListQueueStatsResponse } from "../api";
|
||||
|
||||
export const LIST_QUEUE_STATS_BEGIN = "LIST_QUEUE_STATS_BEGIN";
|
||||
export const LIST_QUEUE_STATS_SUCCESS = "LIST_QUEUE_STATS_SUCCESS";
|
||||
export const LIST_QUEUE_STATS_ERROR = "LIST_QUEUE_STATS_ERROR";
|
||||
|
||||
interface ListQueueStatsBeginAction {
|
||||
type: typeof LIST_QUEUE_STATS_BEGIN;
|
||||
}
|
||||
|
||||
interface ListQueueStatsSuccessAction {
|
||||
type: typeof LIST_QUEUE_STATS_SUCCESS;
|
||||
payload: ListQueueStatsResponse;
|
||||
}
|
||||
|
||||
interface ListQueueStatsErrorAction {
|
||||
type: typeof LIST_QUEUE_STATS_ERROR;
|
||||
error: string;
|
||||
}
|
||||
|
||||
// Union of all queue stats related action types.
|
||||
export type QueueStatsActionTypes =
|
||||
| ListQueueStatsBeginAction
|
||||
| ListQueueStatsSuccessAction
|
||||
| ListQueueStatsErrorAction;
|
||||
|
||||
export function listQueueStatsAsync() {
|
||||
return async (dispatch: Dispatch<QueueStatsActionTypes>) => {
|
||||
dispatch({ type: LIST_QUEUE_STATS_BEGIN });
|
||||
try {
|
||||
const response = await listQueueStats();
|
||||
dispatch({
|
||||
type: LIST_QUEUE_STATS_SUCCESS,
|
||||
payload: response,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error("listQueueStatsAsync: ", error);
|
||||
dispatch({
|
||||
type: LIST_QUEUE_STATS_ERROR,
|
||||
error: "Could not fetch queue stats",
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
@ -60,6 +60,10 @@ export interface BatchKillTasksResponse {
|
||||
error_keys: string[];
|
||||
}
|
||||
|
||||
export interface ListQueueStatsResponse {
|
||||
stats: { [qname: string]: DailyStat[] };
|
||||
}
|
||||
|
||||
export interface Queue {
|
||||
queue: string;
|
||||
paused: boolean;
|
||||
@ -75,6 +79,7 @@ export interface Queue {
|
||||
}
|
||||
|
||||
export interface DailyStat {
|
||||
queue: string;
|
||||
date: string;
|
||||
processed: number;
|
||||
failed: number;
|
||||
@ -174,6 +179,14 @@ export async function resumeQueue(qname: string): Promise<void> {
|
||||
});
|
||||
}
|
||||
|
||||
export async function listQueueStats(): Promise<ListQueueStatsResponse> {
|
||||
const resp = await axios({
|
||||
method: "get",
|
||||
url: `${BASE_URL}/queue_stats`,
|
||||
});
|
||||
return resp.data;
|
||||
}
|
||||
|
||||
export async function listActiveTasks(
|
||||
qname: string,
|
||||
pageOpts?: PaginationOptions
|
||||
|
78
ui/src/reducers/queueStatsReducer.ts
Normal file
78
ui/src/reducers/queueStatsReducer.ts
Normal file
@ -0,0 +1,78 @@
|
||||
import {
|
||||
LIST_QUEUES_SUCCESS,
|
||||
QueuesActionTypes,
|
||||
} from "../actions/queuesActions";
|
||||
import {
|
||||
LIST_QUEUE_STATS_BEGIN,
|
||||
LIST_QUEUE_STATS_ERROR,
|
||||
LIST_QUEUE_STATS_SUCCESS,
|
||||
QueueStatsActionTypes,
|
||||
} from "../actions/queueStatsActions";
|
||||
import { DailyStat } from "../api";
|
||||
|
||||
interface QueueStatsState {
|
||||
loading: boolean;
|
||||
data: { [qname: string]: DailyStat[] };
|
||||
}
|
||||
|
||||
const initialState: QueueStatsState = {
|
||||
loading: false,
|
||||
data: {},
|
||||
};
|
||||
|
||||
export default function queueStatsReducer(
|
||||
state = initialState,
|
||||
action: QueueStatsActionTypes | QueuesActionTypes
|
||||
): QueueStatsState {
|
||||
switch (action.type) {
|
||||
case LIST_QUEUE_STATS_BEGIN:
|
||||
return {
|
||||
...state,
|
||||
loading: true,
|
||||
};
|
||||
|
||||
case LIST_QUEUE_STATS_SUCCESS:
|
||||
return {
|
||||
data: action.payload.stats,
|
||||
loading: false,
|
||||
};
|
||||
|
||||
case LIST_QUEUE_STATS_ERROR:
|
||||
return {
|
||||
...state,
|
||||
loading: false,
|
||||
};
|
||||
|
||||
case LIST_QUEUES_SUCCESS: {
|
||||
// Copy to avoid mutation.
|
||||
let newData = { ...state.data };
|
||||
// Update today's stats with most up-to-date data.
|
||||
for (const q of action.payload.queues) {
|
||||
const stats = newData[q.queue];
|
||||
if (!stats) {
|
||||
continue;
|
||||
}
|
||||
const newStats = stats.map((stat) => {
|
||||
if (isSameDate(stat.date, q.timestamp)) {
|
||||
return {
|
||||
...stat,
|
||||
processed: q.processed,
|
||||
failed: q.failed,
|
||||
};
|
||||
}
|
||||
return stat;
|
||||
});
|
||||
newData[q.queue] = newStats;
|
||||
}
|
||||
return { ...state, data: newData };
|
||||
}
|
||||
|
||||
default:
|
||||
return state;
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if two timestamps are from the same date.
|
||||
function isSameDate(ts1: string, ts2: string): boolean {
|
||||
return new Date(ts1).toDateString() === new Date(ts2).toDateString();
|
||||
}
|
@ -4,6 +4,7 @@ import queuesReducer from "./reducers/queuesReducer";
|
||||
import tasksReducer from "./reducers/tasksReducer";
|
||||
import schedulerEntriesReducer from "./reducers/schedulerEntriesReducer";
|
||||
import snackbarReducer from "./reducers/snackbarReducer";
|
||||
import queueStatsReducer from "./reducers/queueStatsReducer";
|
||||
|
||||
const rootReducer = combineReducers({
|
||||
settings: settingsReducer,
|
||||
@ -11,6 +12,7 @@ const rootReducer = combineReducers({
|
||||
tasks: tasksReducer,
|
||||
schedulerEntries: schedulerEntriesReducer,
|
||||
snackbar: snackbarReducer,
|
||||
queueStats: queueStatsReducer,
|
||||
});
|
||||
|
||||
// AppState is the top-level application state maintained by redux store.
|
||||
|
@ -1,4 +1,4 @@
|
||||
import React from "react";
|
||||
import React, { useEffect } from "react";
|
||||
import { connect, ConnectedProps } from "react-redux";
|
||||
import Container from "@material-ui/core/Container";
|
||||
import { makeStyles } from "@material-ui/core/styles";
|
||||
@ -12,6 +12,7 @@ import {
|
||||
resumeQueueAsync,
|
||||
deleteQueueAsync,
|
||||
} from "../actions/queuesActions";
|
||||
import { listQueueStatsAsync } from "../actions/queueStatsActions";
|
||||
import { AppState } from "../store";
|
||||
import QueueSizeChart from "../components/QueueSizeChart";
|
||||
import ProcessedTasksChart from "../components/ProcessedTasksChart";
|
||||
@ -75,6 +76,7 @@ const mapDispatchToProps = {
|
||||
pauseQueueAsync,
|
||||
resumeQueueAsync,
|
||||
deleteQueueAsync,
|
||||
listQueueStatsAsync,
|
||||
};
|
||||
|
||||
const connector = connect(mapStateToProps, mapDispatchToProps);
|
||||
@ -82,11 +84,21 @@ const connector = connect(mapStateToProps, mapDispatchToProps);
|
||||
type Props = ConnectedProps<typeof connector>;
|
||||
|
||||
function DashboardView(props: Props) {
|
||||
const { pollInterval, listQueuesAsync, queues } = props;
|
||||
const { pollInterval, listQueuesAsync, queues, listQueueStatsAsync } = props;
|
||||
const classes = useStyles();
|
||||
|
||||
usePolling(listQueuesAsync, pollInterval);
|
||||
|
||||
// Refetch queue stats if a queue is added or deleted.
|
||||
const qnames = queues
|
||||
.map((q) => q.queue)
|
||||
.sort()
|
||||
.join(",");
|
||||
|
||||
useEffect(() => {
|
||||
listQueueStatsAsync();
|
||||
}, [listQueueStatsAsync, qnames]);
|
||||
|
||||
const processedStats = queues.map((q) => ({
|
||||
queue: q.queue,
|
||||
succeeded: q.processed - q.failed,
|
||||
|
Loading…
Reference in New Issue
Block a user