Add batch-delete, batch-run, delete-all, run-all functionalities for

scheduled and retry tasks
This commit is contained in:
Ken Hibino 2020-12-19 06:07:23 -08:00
parent 68738ec962
commit f527b0c6d8
9 changed files with 981 additions and 12 deletions

View File

@ -84,12 +84,14 @@ func main() {
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:run_all", newRunAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:run_all", newRunAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")

View File

@ -248,6 +248,28 @@ func newDeleteAllDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu
}
}
func newRunAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.RunAllScheduledTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newRunAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.RunAllRetryTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newRunAllDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]

View File

@ -1,10 +1,16 @@
import {
batchDeleteDeadTasks,
batchDeleteRetryTasks,
batchDeleteScheduledTasks,
BatchDeleteTasksResponse,
batchRunDeadTasks,
batchRunRetryTasks,
batchRunScheduledTasks,
BatchRunTasksResponse,
cancelActiveTask,
deleteAllDeadTasks,
deleteAllRetryTasks,
deleteAllScheduledTasks,
deleteDeadTask,
deleteRetryTask,
deleteScheduledTask,
@ -20,6 +26,8 @@ import {
ListScheduledTasksResponse,
PaginationOptions,
runAllDeadTasks,
runAllRetryTasks,
runAllScheduledTasks,
runDeadTask,
} from "../api";
import { Dispatch } from "redux";
@ -49,9 +57,44 @@ export const RUN_DEAD_TASK_ERROR = "RUN_DEAD_TASK_ERROR";
export const DELETE_SCHEDULED_TASK_BEGIN = "DELETE_SCHEDULED_TASK_BEGIN";
export const DELETE_SCHEDULED_TASK_SUCCESS = "DELETE_SCHEDULED_TASK_SUCCESS";
export const DELETE_SCHEDULED_TASK_ERROR = "DELETE_SCHEDULED_TASK_ERROR";
export const BATCH_RUN_SCHEDULED_TASKS_BEGIN =
"BATCH_RUN_SCHEDULED_TASKS_BEGIN";
export const BATCH_RUN_SCHEDULED_TASKS_SUCCESS =
"BATCH_RUN_SCHEDULED_TASKS_SUCCESS";
export const BATCH_RUN_SCHEDULED_TASKS_ERROR =
"BATCH_RUN_SCHEDULED_TASKS_ERROR";
export const BATCH_DELETE_SCHEDULED_TASKS_BEGIN =
"BATCH_DELETE_SCHEDULED_TASKS_BEGIN";
export const BATCH_DELETE_SCHEDULED_TASKS_SUCCESS =
"BATCH_DELETE_SCHEDULED_TASKS_SUCCESS";
export const BATCH_DELETE_SCHEDULED_TASKS_ERROR =
"BATCH_DELETE_SCHEDULED_TASKS_ERROR";
export const RUN_ALL_SCHEDULED_TASKS_BEGIN = "RUN_ALL_SCHEDULED_TASKS_BEGIN";
export const RUN_ALL_SCHEDULED_TASKS_SUCCESS =
"RUN_ALL_SCHEDULED_TASKS_SUCCESS";
export const RUN_ALL_SCHEDULED_TASKS_ERROR = "RUN_ALL_SCHEDULED_TASKS_ERROR";
export const DELETE_ALL_SCHEDULED_TASKS_BEGIN =
"DELETE_ALL_SCHEDULED_TASKS_BEGIN";
export const DELETE_ALL_SCHEDULED_TASKS_SUCCESS =
"DELETE_ALL_SCHEDULED_TASKS_SUCCESS";
export const DELETE_ALL_SCHEDULED_TASKS_ERROR =
"DELETE_ALL_SCHEDULED_TASKS_ERROR";
export const DELETE_RETRY_TASK_BEGIN = "DELETE_RETRY_TASK_BEGIN";
export const DELETE_RETRY_TASK_SUCCESS = "DELETE_RETRY_TASK_SUCCESS";
export const DELETE_RETRY_TASK_ERROR = "DELETE_RETRY_TASK_ERROR";
export const BATCH_RUN_RETRY_TASKS_BEGIN = "BATCH_RUN_RETRY_TASKS_BEGIN";
export const BATCH_RUN_RETRY_TASKS_SUCCESS = "BATCH_RUN_RETRY_TASKS_SUCCESS";
export const BATCH_RUN_RETRY_TASKS_ERROR = "BATCH_RUN_RETRY_TASKS_ERROR";
export const BATCH_DELETE_RETRY_TASKS_BEGIN = "BATCH_DELETE_RETRY_TASKS_BEGIN";
export const BATCH_DELETE_RETRY_TASKS_SUCCESS =
"BATCH_DELETE_RETRY_TASKS_SUCCESS";
export const BATCH_DELETE_RETRY_TASKS_ERROR = "BATCH_DELETE_RETRY_TASKS_ERROR";
export const RUN_ALL_RETRY_TASKS_BEGIN = "RUN_ALL_RETRY_TASKS_BEGIN";
export const RUN_ALL_RETRY_TASKS_SUCCESS = "RUN_ALL_RETRY_TASKS_SUCCESS";
export const RUN_ALL_RETRY_TASKS_ERROR = "RUN_ALL_RETRY_TASKS_ERROR";
export const DELETE_ALL_RETRY_TASKS_BEGIN = "DELETE_ALL_RETRY_TASKS_BEGIN";
export const DELETE_ALL_RETRY_TASKS_SUCCESS = "DELETE_ALL_RETRY_TASKS_SUCCESS";
export const DELETE_ALL_RETRY_TASKS_ERROR = "DELETE_ALL_RETRY_TASKS_ERROR";
export const DELETE_DEAD_TASK_BEGIN = "DELETE_DEAD_TASK_BEGIN";
export const DELETE_DEAD_TASK_SUCCESS = "DELETE_DEAD_TASK_SUCCESS";
export const DELETE_DEAD_TASK_ERROR = "DELETE_DEAD_TASK_ERROR";
@ -211,6 +254,76 @@ interface DeleteScheduledTaskErrorAction {
error: string;
}
interface BatchDeleteScheduledTasksBeginAction {
type: typeof BATCH_DELETE_SCHEDULED_TASKS_BEGIN;
queue: string;
taskKeys: string[];
}
interface BatchDeleteScheduledTasksSuccessAction {
type: typeof BATCH_DELETE_SCHEDULED_TASKS_SUCCESS;
queue: string;
payload: BatchDeleteTasksResponse;
}
interface BatchDeleteScheduledTasksErrorAction {
type: typeof BATCH_DELETE_SCHEDULED_TASKS_ERROR;
queue: string;
taskKeys: string[];
error: string;
}
interface BatchRunScheduledTasksBeginAction {
type: typeof BATCH_RUN_SCHEDULED_TASKS_BEGIN;
queue: string;
taskKeys: string[];
}
interface BatchRunScheduledTasksSuccessAction {
type: typeof BATCH_RUN_SCHEDULED_TASKS_SUCCESS;
queue: string;
payload: BatchRunTasksResponse;
}
interface BatchRunScheduledTasksErrorAction {
type: typeof BATCH_RUN_SCHEDULED_TASKS_ERROR;
queue: string;
taskKeys: string[];
error: string;
}
interface RunAllScheduledTasksBeginAction {
type: typeof RUN_ALL_SCHEDULED_TASKS_BEGIN;
queue: string;
}
interface RunAllScheduledTasksSuccessAction {
type: typeof RUN_ALL_SCHEDULED_TASKS_SUCCESS;
queue: string;
}
interface RunAllScheduledTasksErrorAction {
type: typeof RUN_ALL_SCHEDULED_TASKS_ERROR;
queue: string;
error: string;
}
interface DeleteAllScheduledTasksBeginAction {
type: typeof DELETE_ALL_SCHEDULED_TASKS_BEGIN;
queue: string;
}
interface DeleteAllScheduledTasksSuccessAction {
type: typeof DELETE_ALL_SCHEDULED_TASKS_SUCCESS;
queue: string;
}
interface DeleteAllScheduledTasksErrorAction {
type: typeof DELETE_ALL_SCHEDULED_TASKS_ERROR;
queue: string;
error: string;
}
interface DeleteRetryTaskBeginAction {
type: typeof DELETE_RETRY_TASK_BEGIN;
queue: string;
@ -230,6 +343,76 @@ interface DeleteRetryTaskErrorAction {
error: string;
}
interface BatchDeleteRetryTasksBeginAction {
type: typeof BATCH_DELETE_RETRY_TASKS_BEGIN;
queue: string;
taskKeys: string[];
}
interface BatchDeleteRetryTasksSuccessAction {
type: typeof BATCH_DELETE_RETRY_TASKS_SUCCESS;
queue: string;
payload: BatchDeleteTasksResponse;
}
interface BatchDeleteRetryTasksErrorAction {
type: typeof BATCH_DELETE_RETRY_TASKS_ERROR;
queue: string;
taskKeys: string[];
error: string;
}
interface BatchRunRetryTasksBeginAction {
type: typeof BATCH_RUN_RETRY_TASKS_BEGIN;
queue: string;
taskKeys: string[];
}
interface BatchRunRetryTasksSuccessAction {
type: typeof BATCH_RUN_RETRY_TASKS_SUCCESS;
queue: string;
payload: BatchRunTasksResponse;
}
interface BatchRunRetryTasksErrorAction {
type: typeof BATCH_RUN_RETRY_TASKS_ERROR;
queue: string;
taskKeys: string[];
error: string;
}
interface RunAllRetryTasksBeginAction {
type: typeof RUN_ALL_RETRY_TASKS_BEGIN;
queue: string;
}
interface RunAllRetryTasksSuccessAction {
type: typeof RUN_ALL_RETRY_TASKS_SUCCESS;
queue: string;
}
interface RunAllRetryTasksErrorAction {
type: typeof RUN_ALL_RETRY_TASKS_ERROR;
queue: string;
error: string;
}
interface DeleteAllRetryTasksBeginAction {
type: typeof DELETE_ALL_RETRY_TASKS_BEGIN;
queue: string;
}
interface DeleteAllRetryTasksSuccessAction {
type: typeof DELETE_ALL_RETRY_TASKS_SUCCESS;
queue: string;
}
interface DeleteAllRetryTasksErrorAction {
type: typeof DELETE_ALL_RETRY_TASKS_ERROR;
queue: string;
error: string;
}
interface DeleteDeadTaskBeginAction {
type: typeof DELETE_DEAD_TASK_BEGIN;
queue: string;
@ -345,9 +528,33 @@ export type TasksActionTypes =
| DeleteScheduledTaskBeginAction
| DeleteScheduledTaskSuccessAction
| DeleteScheduledTaskErrorAction
| BatchDeleteScheduledTasksBeginAction
| BatchDeleteScheduledTasksSuccessAction
| BatchDeleteScheduledTasksErrorAction
| BatchRunScheduledTasksBeginAction
| BatchRunScheduledTasksSuccessAction
| BatchRunScheduledTasksErrorAction
| RunAllScheduledTasksBeginAction
| RunAllScheduledTasksSuccessAction
| RunAllScheduledTasksErrorAction
| DeleteAllScheduledTasksBeginAction
| DeleteAllScheduledTasksSuccessAction
| DeleteAllScheduledTasksErrorAction
| DeleteRetryTaskBeginAction
| DeleteRetryTaskSuccessAction
| DeleteRetryTaskErrorAction
| BatchDeleteRetryTasksBeginAction
| BatchDeleteRetryTasksSuccessAction
| BatchDeleteRetryTasksErrorAction
| BatchRunRetryTasksBeginAction
| BatchRunRetryTasksSuccessAction
| BatchRunRetryTasksErrorAction
| RunAllRetryTasksBeginAction
| RunAllRetryTasksSuccessAction
| RunAllRetryTasksErrorAction
| DeleteAllRetryTasksBeginAction
| DeleteAllRetryTasksSuccessAction
| DeleteAllRetryTasksErrorAction
| DeleteDeadTaskBeginAction
| DeleteDeadTaskSuccessAction
| DeleteDeadTaskErrorAction
@ -532,6 +739,87 @@ export function deleteScheduledTaskAsync(queue: string, taskKey: string) {
};
}
export function batchDeleteScheduledTasksAsync(
queue: string,
taskKeys: string[]
) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: BATCH_DELETE_SCHEDULED_TASKS_BEGIN, queue, taskKeys });
try {
const response = await batchDeleteScheduledTasks(queue, taskKeys);
dispatch({
type: BATCH_DELETE_SCHEDULED_TASKS_SUCCESS,
queue: queue,
payload: response,
});
} catch (error) {
console.error("batchDeleteScheduledTasksAsync: ", error);
dispatch({
type: BATCH_DELETE_SCHEDULED_TASKS_ERROR,
error: `Could not batch delete tasks: ${taskKeys}`,
queue,
taskKeys,
});
}
};
}
export function batchRunScheduledTasksAsync(queue: string, taskKeys: string[]) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: BATCH_RUN_SCHEDULED_TASKS_BEGIN, queue, taskKeys });
try {
const response = await batchRunScheduledTasks(queue, taskKeys);
dispatch({
type: BATCH_RUN_SCHEDULED_TASKS_SUCCESS,
queue: queue,
payload: response,
});
} catch (error) {
console.error("batchRunScheduledTasksAsync: ", error);
dispatch({
type: BATCH_RUN_SCHEDULED_TASKS_ERROR,
error: `Could not batch run tasks: ${taskKeys}`,
queue,
taskKeys,
});
}
};
}
export function deleteAllScheduledTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_BEGIN, queue });
try {
await deleteAllScheduledTasks(queue);
dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_SUCCESS, queue });
} catch (error) {
console.error("deleteAllScheduledTasksAsync: ", error);
dispatch({
type: DELETE_ALL_SCHEDULED_TASKS_ERROR,
error: `Could not delete all scheduled tasks`,
queue,
});
}
};
}
export function runAllScheduledTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: RUN_ALL_SCHEDULED_TASKS_BEGIN, queue });
try {
await runAllScheduledTasks(queue);
dispatch({ type: RUN_ALL_SCHEDULED_TASKS_SUCCESS, queue });
} catch (error) {
console.error("runAllScheduledTasksAsync: ", error);
dispatch({
type: RUN_ALL_SCHEDULED_TASKS_ERROR,
error: `Could not run all scheduled tasks`,
queue,
});
}
};
}
export function deleteRetryTaskAsync(queue: string, taskKey: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_RETRY_TASK_BEGIN, queue, taskKey });
@ -550,6 +838,84 @@ export function deleteRetryTaskAsync(queue: string, taskKey: string) {
};
}
export function batchDeleteRetryTasksAsync(queue: string, taskKeys: string[]) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: BATCH_DELETE_RETRY_TASKS_BEGIN, queue, taskKeys });
try {
const response = await batchDeleteRetryTasks(queue, taskKeys);
dispatch({
type: BATCH_DELETE_RETRY_TASKS_SUCCESS,
queue: queue,
payload: response,
});
} catch (error) {
console.error("batchDeleteRetryTasksAsync: ", error);
dispatch({
type: BATCH_DELETE_RETRY_TASKS_ERROR,
error: `Could not batch delete tasks: ${taskKeys}`,
queue,
taskKeys,
});
}
};
}
export function batchRunRetryTasksAsync(queue: string, taskKeys: string[]) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: BATCH_RUN_RETRY_TASKS_BEGIN, queue, taskKeys });
try {
const response = await batchRunRetryTasks(queue, taskKeys);
dispatch({
type: BATCH_RUN_RETRY_TASKS_SUCCESS,
queue: queue,
payload: response,
});
} catch (error) {
console.error("batchRunRetryTasksAsync: ", error);
dispatch({
type: BATCH_RUN_RETRY_TASKS_ERROR,
error: `Could not batch run tasks: ${taskKeys}`,
queue,
taskKeys,
});
}
};
}
export function deleteAllRetryTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_RETRY_TASKS_BEGIN, queue });
try {
await deleteAllRetryTasks(queue);
dispatch({ type: DELETE_ALL_RETRY_TASKS_SUCCESS, queue });
} catch (error) {
console.error("deleteAllRetryTasksAsync: ", error);
dispatch({
type: DELETE_ALL_RETRY_TASKS_ERROR,
error: `Could not delete all retry tasks`,
queue,
});
}
};
}
export function runAllRetryTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: RUN_ALL_RETRY_TASKS_BEGIN, queue });
try {
await runAllRetryTasks(queue);
dispatch({ type: RUN_ALL_RETRY_TASKS_SUCCESS, queue });
} catch (error) {
console.error("runAllRetryTasksAsync: ", error);
dispatch({
type: RUN_ALL_RETRY_TASKS_ERROR,
error: `Could not run all retry tasks`,
queue,
});
}
};
}
export function deleteDeadTaskAsync(queue: string, taskKey: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_DEAD_TASK_BEGIN, queue, taskKey });

View File

@ -263,6 +263,48 @@ export async function deleteScheduledTask(
});
}
export async function batchDeleteScheduledTasks(
qname: string,
taskKeys: string[]
): Promise<BatchDeleteTasksResponse> {
const resp = await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/scheduled_tasks:batch_delete`,
data: {
task_keys: taskKeys,
},
});
return resp.data;
}
export async function deleteAllScheduledTasks(qname: string): Promise<void> {
await axios({
method: "delete",
url: `${BASE_URL}/queues/${qname}/scheduled_tasks:delete_all`,
});
}
export async function batchRunScheduledTasks(
qname: string,
taskKeys: string[]
): Promise<BatchRunTasksResponse> {
const resp = await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/scheduled_tasks:batch_run`,
data: {
task_keys: taskKeys,
},
});
return resp.data;
}
export async function runAllScheduledTasks(qname: string): Promise<void> {
await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/scheduled_tasks:run_all`,
});
}
export async function deleteRetryTask(
qname: string,
taskKey: string
@ -273,6 +315,48 @@ export async function deleteRetryTask(
});
}
export async function batchDeleteRetryTasks(
qname: string,
taskKeys: string[]
): Promise<BatchDeleteTasksResponse> {
const resp = await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/retry_tasks:batch_delete`,
data: {
task_keys: taskKeys,
},
});
return resp.data;
}
export async function deleteAllRetryTasks(qname: string): Promise<void> {
await axios({
method: "delete",
url: `${BASE_URL}/queues/${qname}/retry_tasks:delete_all`,
});
}
export async function batchRunRetryTasks(
qname: string,
taskKeys: string[]
): Promise<BatchRunTasksResponse> {
const resp = await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/retry_tasks:batch_run`,
data: {
task_keys: taskKeys,
},
});
return resp.data;
}
export async function runAllRetryTasks(qname: string): Promise<void> {
await axios({
method: "post",
url: `${BASE_URL}/queues/${qname}/retry_tasks:run_all`,
});
}
export async function runDeadTask(
qname: string,
taskKey: string

View File

@ -23,6 +23,10 @@ import AlertTitle from "@material-ui/lab/AlertTitle";
import SyntaxHighlighter from "react-syntax-highlighter";
import syntaxHighlightStyle from "react-syntax-highlighter/dist/esm/styles/hljs/github";
import {
batchDeleteRetryTasksAsync,
batchRunRetryTasksAsync,
deleteAllRetryTasksAsync,
runAllRetryTasksAsync,
listRetryTasksAsync,
deleteRetryTaskAsync,
} from "../actions/tasksActions";
@ -52,7 +56,14 @@ function mapStateToProps(state: AppState) {
};
}
const mapDispatchToProps = { listRetryTasksAsync, deleteRetryTaskAsync };
const mapDispatchToProps = {
batchDeleteRetryTasksAsync,
batchRunRetryTasksAsync,
deleteAllRetryTasksAsync,
runAllRetryTasksAsync,
listRetryTasksAsync,
deleteRetryTaskAsync,
};
const connector = connect(mapStateToProps, mapDispatchToProps);
@ -93,6 +104,26 @@ function RetryTasksTable(props: Props & ReduxProps) {
}
};
const handleRunAllClick = () => {
props.runAllRetryTasksAsync(queue);
};
const handleDeleteAllClick = () => {
props.deleteAllRetryTasksAsync(queue);
};
const handleBatchRunClick = () => {
props
.batchDeleteRetryTasksAsync(queue, selectedKeys)
.then(() => setSelectedKeys([]));
};
const handleBatchDeleteClick = () => {
props
.batchRunRetryTasksAsync(queue, selectedKeys)
.then(() => setSelectedKeys([]));
};
const fetchData = useCallback(() => {
const pageOpts = { page: page + 1, size: pageSize };
listRetryTasksAsync(queue, pageOpts);
@ -128,10 +159,10 @@ function RetryTasksTable(props: Props & ReduxProps) {
allActionPending={props.allActionPending}
batchActionPending={props.batchActionPending}
showBatchActions={numSelected > 0}
onRunAllClick={() => console.log("TODO")}
onDeleteAllClick={() => console.log("TODO")}
onBatchRunClick={() => console.log("TODO")}
onBatchDeleteClick={() => console.log("TODO")}
onRunAllClick={handleRunAllClick}
onDeleteAllClick={handleDeleteAllClick}
onBatchRunClick={handleBatchRunClick}
onBatchDeleteClick={handleBatchDeleteClick}
/>
<TableContainer component={Paper}>
<Table

View File

@ -23,6 +23,10 @@ import AlertTitle from "@material-ui/lab/AlertTitle";
import SyntaxHighlighter from "react-syntax-highlighter";
import syntaxHighlightStyle from "react-syntax-highlighter/dist/esm/styles/hljs/github";
import {
batchDeleteScheduledTasksAsync,
batchRunScheduledTasksAsync,
deleteAllScheduledTasksAsync,
runAllScheduledTasksAsync,
listScheduledTasksAsync,
deleteScheduledTaskAsync,
} from "../actions/tasksActions";
@ -55,6 +59,10 @@ function mapStateToProps(state: AppState) {
const mapDispatchToProps = {
listScheduledTasksAsync,
deleteScheduledTaskAsync,
batchDeleteScheduledTasksAsync,
batchRunScheduledTasksAsync,
deleteAllScheduledTasksAsync,
runAllScheduledTasksAsync,
};
const connector = connect(mapStateToProps, mapDispatchToProps);
@ -96,6 +104,26 @@ function ScheduledTasksTable(props: Props & ReduxProps) {
}
};
const handleRunAllClick = () => {
props.runAllScheduledTasksAsync(queue);
};
const handleDeleteAllClick = () => {
props.deleteAllScheduledTasksAsync(queue);
};
const handleBatchRunClick = () => {
props
.batchDeleteScheduledTasksAsync(queue, selectedKeys)
.then(() => setSelectedKeys([]));
};
const handleBatchDeleteClick = () => {
props
.batchRunScheduledTasksAsync(queue, selectedKeys)
.then(() => setSelectedKeys([]));
};
const fetchData = useCallback(() => {
const pageOpts = { page: page + 1, size: pageSize };
listScheduledTasksAsync(queue, pageOpts);
@ -128,10 +156,10 @@ function ScheduledTasksTable(props: Props & ReduxProps) {
allActionPending={props.allActionPending}
batchActionPending={props.batchActionPending}
showBatchActions={numSelected > 0}
onRunAllClick={() => console.log("TODO")}
onDeleteAllClick={() => console.log("TODO")}
onBatchRunClick={() => console.log("TODO")}
onBatchDeleteClick={() => console.log("TODO")}
onRunAllClick={handleRunAllClick}
onDeleteAllClick={handleDeleteAllClick}
onBatchRunClick={handleBatchRunClick}
onBatchDeleteClick={handleBatchDeleteClick}
/>
<TableContainer component={Paper}>
<Table

View File

@ -15,8 +15,14 @@ import {
} from "../actions/queuesActions";
import {
BATCH_DELETE_DEAD_TASKS_SUCCESS,
BATCH_DELETE_RETRY_TASKS_SUCCESS,
BATCH_DELETE_SCHEDULED_TASKS_SUCCESS,
BATCH_RUN_DEAD_TASKS_SUCCESS,
BATCH_RUN_RETRY_TASKS_SUCCESS,
BATCH_RUN_SCHEDULED_TASKS_SUCCESS,
DELETE_ALL_DEAD_TASKS_SUCCESS,
DELETE_ALL_RETRY_TASKS_SUCCESS,
DELETE_ALL_SCHEDULED_TASKS_SUCCESS,
DELETE_DEAD_TASK_SUCCESS,
DELETE_RETRY_TASK_SUCCESS,
DELETE_SCHEDULED_TASK_SUCCESS,
@ -26,6 +32,8 @@ import {
LIST_RETRY_TASKS_SUCCESS,
LIST_SCHEDULED_TASKS_SUCCESS,
RUN_ALL_DEAD_TASKS_SUCCESS,
RUN_ALL_RETRY_TASKS_SUCCESS,
RUN_ALL_SCHEDULED_TASKS_SUCCESS,
RUN_DEAD_TASK_SUCCESS,
TasksActionTypes,
} from "../actions/tasksActions";
@ -188,6 +196,79 @@ function queuesReducer(
return { ...state, data: newData };
}
case BATCH_RUN_SCHEDULED_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
pending:
queueInfo.currentStats.pending +
action.payload.pending_keys.length,
scheduled:
queueInfo.currentStats.scheduled -
action.payload.pending_keys.length,
},
};
});
return { ...state, data: newData };
}
case BATCH_DELETE_SCHEDULED_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
scheduled:
queueInfo.currentStats.scheduled -
action.payload.deleted_keys.length,
},
};
});
return { ...state, data: newData };
}
case RUN_ALL_SCHEDULED_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
pending:
queueInfo.currentStats.pending + queueInfo.currentStats.scheduled,
scheduled: 0,
},
};
});
return { ...state, data: newData };
}
case DELETE_ALL_SCHEDULED_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
scheduled: 0,
},
};
});
return { ...state, data: newData };
}
case DELETE_RETRY_TASK_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
@ -204,6 +285,77 @@ function queuesReducer(
return { ...state, data: newData };
}
case BATCH_RUN_RETRY_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
pending:
queueInfo.currentStats.pending +
action.payload.pending_keys.length,
retry:
queueInfo.currentStats.retry - action.payload.pending_keys.length,
},
};
});
return { ...state, data: newData };
}
case BATCH_DELETE_RETRY_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
retry:
queueInfo.currentStats.retry - action.payload.deleted_keys.length,
},
};
});
return { ...state, data: newData };
}
case RUN_ALL_RETRY_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
pending:
queueInfo.currentStats.pending + queueInfo.currentStats.retry,
retry: 0,
},
};
});
return { ...state, data: newData };
}
case DELETE_ALL_RETRY_TASKS_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {
return queueInfo;
}
return {
...queueInfo,
currentStats: {
...queueInfo.currentStats,
retry: 0,
},
};
});
return { ...state, data: newData };
}
case DELETE_DEAD_TASK_SUCCESS: {
const newData = state.data.map((queueInfo) => {
if (queueInfo.name !== action.queue) {

View File

@ -4,12 +4,20 @@ import {
} from "../actions/snackbarActions";
import {
BATCH_DELETE_DEAD_TASKS_SUCCESS,
BATCH_DELETE_RETRY_TASKS_SUCCESS,
BATCH_DELETE_SCHEDULED_TASKS_SUCCESS,
BATCH_RUN_DEAD_TASKS_SUCCESS,
BATCH_RUN_RETRY_TASKS_SUCCESS,
BATCH_RUN_SCHEDULED_TASKS_SUCCESS,
DELETE_ALL_DEAD_TASKS_SUCCESS,
DELETE_ALL_RETRY_TASKS_SUCCESS,
DELETE_ALL_SCHEDULED_TASKS_SUCCESS,
DELETE_DEAD_TASK_SUCCESS,
DELETE_RETRY_TASK_SUCCESS,
DELETE_SCHEDULED_TASK_SUCCESS,
RUN_ALL_DEAD_TASKS_SUCCESS,
RUN_ALL_RETRY_TASKS_SUCCESS,
RUN_ALL_SCHEDULED_TASKS_SUCCESS,
RUN_DEAD_TASK_SUCCESS,
TasksActionTypes,
} from "../actions/tasksActions";
@ -51,6 +59,36 @@ function snackbarReducer(
message: `Scheduled task ${action.taskKey} deleted`,
};
case BATCH_RUN_SCHEDULED_TASKS_SUCCESS: {
const n = action.payload.pending_keys.length;
return {
isOpen: true,
message: `${n} scheduled ${
n === 1 ? "task is" : "tasks are"
} now pending`,
};
}
case BATCH_DELETE_SCHEDULED_TASKS_SUCCESS: {
const n = action.payload.deleted_keys.length;
return {
isOpen: true,
message: `${n} scheduled ${n === 1 ? "task" : "tasks"} deleted`,
};
}
case RUN_ALL_SCHEDULED_TASKS_SUCCESS:
return {
isOpen: true,
message: "All scheduled tasks are now pending",
};
case DELETE_ALL_SCHEDULED_TASKS_SUCCESS:
return {
isOpen: true,
message: "All scheduled tasks deleted",
};
case DELETE_RETRY_TASK_SUCCESS:
return {
isOpen: true,
@ -58,6 +96,34 @@ function snackbarReducer(
message: `Retry task ${action.taskKey} deleted`,
};
case BATCH_RUN_RETRY_TASKS_SUCCESS: {
const n = action.payload.pending_keys.length;
return {
isOpen: true,
message: `${n} retry ${n === 1 ? "task is" : "tasks are"} now pending`,
};
}
case BATCH_DELETE_RETRY_TASKS_SUCCESS: {
const n = action.payload.deleted_keys.length;
return {
isOpen: true,
message: `${n} retry ${n === 1 ? "task" : "tasks"} deleted`,
};
}
case RUN_ALL_RETRY_TASKS_SUCCESS:
return {
isOpen: true,
message: "All retry tasks are now pending",
};
case DELETE_ALL_RETRY_TASKS_SUCCESS:
return {
isOpen: true,
message: "All retry tasks deleted",
};
case DELETE_DEAD_TASK_SUCCESS:
return {
isOpen: true,
@ -69,7 +135,7 @@ function snackbarReducer(
const n = action.payload.pending_keys.length;
return {
isOpen: true,
message: `${n} Dead ${n === 1 ? "task is" : "tasks are"} now pending`,
message: `${n} dead ${n === 1 ? "task is" : "tasks are"} now pending`,
};
}
@ -77,7 +143,7 @@ function snackbarReducer(
const n = action.payload.deleted_keys.length;
return {
isOpen: true,
message: `${n} Dead ${n === 1 ? "task" : "tasks"} deleted`,
message: `${n} dead ${n === 1 ? "task" : "tasks"} deleted`,
};
}
@ -90,7 +156,7 @@ function snackbarReducer(
case DELETE_ALL_DEAD_TASKS_SUCCESS:
return {
isOpen: true,
message: "All dead tasks delete",
message: "All dead tasks deleted",
};
default:

View File

@ -42,6 +42,30 @@ import {
RUN_ALL_DEAD_TASKS_BEGIN,
RUN_ALL_DEAD_TASKS_ERROR,
RUN_ALL_DEAD_TASKS_SUCCESS,
BATCH_DELETE_RETRY_TASKS_ERROR,
BATCH_RUN_RETRY_TASKS_ERROR,
BATCH_DELETE_RETRY_TASKS_SUCCESS,
BATCH_RUN_RETRY_TASKS_SUCCESS,
BATCH_DELETE_RETRY_TASKS_BEGIN,
BATCH_RUN_RETRY_TASKS_BEGIN,
DELETE_ALL_RETRY_TASKS_ERROR,
RUN_ALL_RETRY_TASKS_ERROR,
DELETE_ALL_RETRY_TASKS_SUCCESS,
RUN_ALL_RETRY_TASKS_SUCCESS,
DELETE_ALL_RETRY_TASKS_BEGIN,
RUN_ALL_RETRY_TASKS_BEGIN,
BATCH_DELETE_SCHEDULED_TASKS_ERROR,
BATCH_RUN_SCHEDULED_TASKS_ERROR,
BATCH_DELETE_SCHEDULED_TASKS_SUCCESS,
BATCH_RUN_SCHEDULED_TASKS_SUCCESS,
BATCH_DELETE_SCHEDULED_TASKS_BEGIN,
BATCH_RUN_SCHEDULED_TASKS_BEGIN,
DELETE_ALL_SCHEDULED_TASKS_ERROR,
RUN_ALL_SCHEDULED_TASKS_ERROR,
DELETE_ALL_SCHEDULED_TASKS_SUCCESS,
RUN_ALL_SCHEDULED_TASKS_SUCCESS,
DELETE_ALL_SCHEDULED_TASKS_BEGIN,
RUN_ALL_SCHEDULED_TASKS_BEGIN,
} from "../actions/tasksActions";
import {
ActiveTask,
@ -404,6 +428,103 @@ function tasksReducer(
},
};
case RUN_ALL_SCHEDULED_TASKS_BEGIN:
case DELETE_ALL_SCHEDULED_TASKS_BEGIN:
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
allActionPending: true,
},
};
case RUN_ALL_SCHEDULED_TASKS_SUCCESS:
case DELETE_ALL_SCHEDULED_TASKS_SUCCESS:
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
allActionPending: false,
data: [],
},
};
case RUN_ALL_SCHEDULED_TASKS_ERROR:
case DELETE_ALL_SCHEDULED_TASKS_ERROR:
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
allActionPending: false,
},
};
case BATCH_RUN_SCHEDULED_TASKS_BEGIN:
case BATCH_DELETE_SCHEDULED_TASKS_BEGIN:
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
batchActionPending: true,
data: state.scheduledTasks.data.map((task) => {
if (!action.taskKeys.includes(task.key)) {
return task;
}
return {
...task,
requestPending: true,
};
}),
},
};
case BATCH_RUN_SCHEDULED_TASKS_SUCCESS: {
const newData = state.scheduledTasks.data.filter(
(task) => !action.payload.pending_keys.includes(task.key)
);
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
batchActionPending: false,
data: newData,
},
};
}
case BATCH_DELETE_SCHEDULED_TASKS_SUCCESS: {
const newData = state.scheduledTasks.data.filter(
(task) => !action.payload.deleted_keys.includes(task.key)
);
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
batchActionPending: false,
data: newData,
},
};
}
case BATCH_RUN_SCHEDULED_TASKS_ERROR:
case BATCH_DELETE_SCHEDULED_TASKS_ERROR:
return {
...state,
scheduledTasks: {
...state.scheduledTasks,
batchActionPending: false,
data: state.scheduledTasks.data.map((task) => {
if (!action.taskKeys.includes(task.key)) {
return task;
}
return {
...task,
requestPending: false,
};
}),
},
};
case DELETE_RETRY_TASK_BEGIN:
return {
...state,
@ -443,6 +564,103 @@ function tasksReducer(
},
};
case RUN_ALL_RETRY_TASKS_BEGIN:
case DELETE_ALL_RETRY_TASKS_BEGIN:
return {
...state,
retryTasks: {
...state.retryTasks,
allActionPending: true,
},
};
case RUN_ALL_RETRY_TASKS_SUCCESS:
case DELETE_ALL_RETRY_TASKS_SUCCESS:
return {
...state,
retryTasks: {
...state.retryTasks,
allActionPending: false,
data: [],
},
};
case RUN_ALL_RETRY_TASKS_ERROR:
case DELETE_ALL_RETRY_TASKS_ERROR:
return {
...state,
retryTasks: {
...state.retryTasks,
allActionPending: false,
},
};
case BATCH_RUN_RETRY_TASKS_BEGIN:
case BATCH_DELETE_RETRY_TASKS_BEGIN:
return {
...state,
retryTasks: {
...state.retryTasks,
batchActionPending: true,
data: state.retryTasks.data.map((task) => {
if (!action.taskKeys.includes(task.key)) {
return task;
}
return {
...task,
requestPending: true,
};
}),
},
};
case BATCH_RUN_RETRY_TASKS_SUCCESS: {
const newData = state.retryTasks.data.filter(
(task) => !action.payload.pending_keys.includes(task.key)
);
return {
...state,
retryTasks: {
...state.retryTasks,
batchActionPending: false,
data: newData,
},
};
}
case BATCH_DELETE_RETRY_TASKS_SUCCESS: {
const newData = state.retryTasks.data.filter(
(task) => !action.payload.deleted_keys.includes(task.key)
);
return {
...state,
retryTasks: {
...state.retryTasks,
batchActionPending: false,
data: newData,
},
};
}
case BATCH_RUN_RETRY_TASKS_ERROR:
case BATCH_DELETE_RETRY_TASKS_ERROR:
return {
...state,
retryTasks: {
...state.retryTasks,
batchActionPending: false,
data: state.retryTasks.data.map((task) => {
if (!action.taskKeys.includes(task.key)) {
return task;
}
return {
...task,
requestPending: false,
};
}),
},
};
case RUN_DEAD_TASK_BEGIN:
case DELETE_DEAD_TASK_BEGIN:
return {