Fix queue size when delete action happens

This commit is contained in:
Ken Hibino 2021-01-26 21:43:51 -08:00
parent a6498ca729
commit a488599ec0
4 changed files with 101 additions and 24 deletions

View File

@ -326,47 +326,72 @@ func newArchiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
} }
} }
type DeleteAllTasksResponse struct {
// Number of tasks deleted.
Deleted int `json:"deleted"`
}
func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"] qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllPendingTasks(qname); err != nil { n, err := inspector.DeleteAllPendingTasks(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := DeleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusNoContent)
} }
} }
func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"] qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllScheduledTasks(qname); err != nil { n, err := inspector.DeleteAllScheduledTasks(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := DeleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusNoContent)
} }
} }
func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"] qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllRetryTasks(qname); err != nil { n, err := inspector.DeleteAllRetryTasks(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := DeleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusNoContent)
} }
} }
func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"] qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllArchivedTasks(qname); err != nil { n, err := inspector.DeleteAllArchivedTasks(qname)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp := DeleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
w.WriteHeader(http.StatusNoContent)
} }
} }

View File

@ -394,6 +394,7 @@ interface DeleteAllPendingTasksBeginAction {
interface DeleteAllPendingTasksSuccessAction { interface DeleteAllPendingTasksSuccessAction {
type: typeof DELETE_ALL_PENDING_TASKS_SUCCESS; type: typeof DELETE_ALL_PENDING_TASKS_SUCCESS;
queue: string; queue: string;
deleted: number;
} }
interface DeleteAllPendingTasksErrorAction { interface DeleteAllPendingTasksErrorAction {
@ -667,6 +668,7 @@ interface DeleteAllScheduledTasksBeginAction {
interface DeleteAllScheduledTasksSuccessAction { interface DeleteAllScheduledTasksSuccessAction {
type: typeof DELETE_ALL_SCHEDULED_TASKS_SUCCESS; type: typeof DELETE_ALL_SCHEDULED_TASKS_SUCCESS;
queue: string; queue: string;
deleted: number;
} }
interface DeleteAllScheduledTasksErrorAction { interface DeleteAllScheduledTasksErrorAction {
@ -791,6 +793,7 @@ interface DeleteAllRetryTasksBeginAction {
interface DeleteAllRetryTasksSuccessAction { interface DeleteAllRetryTasksSuccessAction {
type: typeof DELETE_ALL_RETRY_TASKS_SUCCESS; type: typeof DELETE_ALL_RETRY_TASKS_SUCCESS;
queue: string; queue: string;
deleted: number;
} }
interface DeleteAllRetryTasksErrorAction { interface DeleteAllRetryTasksErrorAction {
@ -880,6 +883,7 @@ interface DeleteAllArchivedTasksBeginAction {
interface DeleteAllArchivedTasksSuccessAction { interface DeleteAllArchivedTasksSuccessAction {
type: typeof DELETE_ALL_ARCHIVED_TASKS_SUCCESS; type: typeof DELETE_ALL_ARCHIVED_TASKS_SUCCESS;
queue: string; queue: string;
deleted: number;
} }
interface DeleteAllArchivedTasksErrorAction { interface DeleteAllArchivedTasksErrorAction {
@ -1532,8 +1536,12 @@ export function deleteAllPendingTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => { return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_PENDING_TASKS_BEGIN, queue }); dispatch({ type: DELETE_ALL_PENDING_TASKS_BEGIN, queue });
try { try {
await deleteAllPendingTasks(queue); const response = await deleteAllPendingTasks(queue);
dispatch({ type: DELETE_ALL_PENDING_TASKS_SUCCESS, queue }); dispatch({
type: DELETE_ALL_PENDING_TASKS_SUCCESS,
deleted: response.deleted,
queue,
});
} catch (error) { } catch (error) {
console.error( console.error(
"deleteAllPendingTasksAsync: ", "deleteAllPendingTasksAsync: ",
@ -1552,8 +1560,12 @@ export function deleteAllScheduledTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => { return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_BEGIN, queue }); dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_BEGIN, queue });
try { try {
await deleteAllScheduledTasks(queue); const response = await deleteAllScheduledTasks(queue);
dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_SUCCESS, queue }); dispatch({
type: DELETE_ALL_SCHEDULED_TASKS_SUCCESS,
deleted: response.deleted,
queue,
});
} catch (error) { } catch (error) {
console.error( console.error(
"deleteAllScheduledTasksAsync: ", "deleteAllScheduledTasksAsync: ",
@ -1708,8 +1720,12 @@ export function deleteAllRetryTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => { return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_RETRY_TASKS_BEGIN, queue }); dispatch({ type: DELETE_ALL_RETRY_TASKS_BEGIN, queue });
try { try {
await deleteAllRetryTasks(queue); const response = await deleteAllRetryTasks(queue);
dispatch({ type: DELETE_ALL_RETRY_TASKS_SUCCESS, queue }); dispatch({
type: DELETE_ALL_RETRY_TASKS_SUCCESS,
deleted: response.deleted,
queue,
});
} catch (error) { } catch (error) {
console.error( console.error(
"deleteAllRetryTasksAsync: ", "deleteAllRetryTasksAsync: ",
@ -1842,8 +1858,12 @@ export function deleteAllArchivedTasksAsync(queue: string) {
return async (dispatch: Dispatch<TasksActionTypes>) => { return async (dispatch: Dispatch<TasksActionTypes>) => {
dispatch({ type: DELETE_ALL_ARCHIVED_TASKS_BEGIN, queue }); dispatch({ type: DELETE_ALL_ARCHIVED_TASKS_BEGIN, queue });
try { try {
await deleteAllArchivedTasks(queue); const response = await deleteAllArchivedTasks(queue);
dispatch({ type: DELETE_ALL_ARCHIVED_TASKS_SUCCESS, queue }); dispatch({
type: DELETE_ALL_ARCHIVED_TASKS_SUCCESS,
deleted: response.deleted,
queue,
});
} catch (error) { } catch (error) {
console.error( console.error(
"deleteAllArchivedTasksAsync: ", "deleteAllArchivedTasksAsync: ",

View File

@ -68,6 +68,10 @@ export interface BatchArchiveTasksResponse {
error_keys: string[]; error_keys: string[];
} }
export interface DeleteAllTasksResponse {
deleted: number;
}
export interface ListQueueStatsResponse { export interface ListQueueStatsResponse {
stats: { [qname: string]: DailyStat[] }; stats: { [qname: string]: DailyStat[] };
} }
@ -517,11 +521,14 @@ export async function batchDeletePendingTasks(
return resp.data; return resp.data;
} }
export async function deleteAllPendingTasks(qname: string): Promise<void> { export async function deleteAllPendingTasks(
await axios({ qname: string
): Promise<DeleteAllTasksResponse> {
const resp = await axios({
method: "delete", method: "delete",
url: `${BASE_URL}/queues/${qname}/pending_tasks:delete_all`, url: `${BASE_URL}/queues/${qname}/pending_tasks:delete_all`,
}); });
return resp.data;
} }
export async function runScheduledTask( export async function runScheduledTask(
@ -568,11 +575,14 @@ export async function batchDeleteScheduledTasks(
return resp.data; return resp.data;
} }
export async function deleteAllScheduledTasks(qname: string): Promise<void> { export async function deleteAllScheduledTasks(
await axios({ qname: string
): Promise<DeleteAllTasksResponse> {
const resp = await axios({
method: "delete", method: "delete",
url: `${BASE_URL}/queues/${qname}/scheduled_tasks:delete_all`, url: `${BASE_URL}/queues/${qname}/scheduled_tasks:delete_all`,
}); });
return resp.data;
} }
export async function batchRunScheduledTasks( export async function batchRunScheduledTasks(
@ -661,11 +671,14 @@ export async function batchDeleteRetryTasks(
return resp.data; return resp.data;
} }
export async function deleteAllRetryTasks(qname: string): Promise<void> { export async function deleteAllRetryTasks(
await axios({ qname: string
): Promise<DeleteAllTasksResponse> {
const resp = await axios({
method: "delete", method: "delete",
url: `${BASE_URL}/queues/${qname}/retry_tasks:delete_all`, url: `${BASE_URL}/queues/${qname}/retry_tasks:delete_all`,
}); });
return resp.data;
} }
export async function batchRunRetryTasks( export async function batchRunRetryTasks(
@ -744,11 +757,14 @@ export async function batchDeleteArchivedTasks(
return resp.data; return resp.data;
} }
export async function deleteAllArchivedTasks(qname: string): Promise<void> { export async function deleteAllArchivedTasks(
await axios({ qname: string
): Promise<DeleteAllTasksResponse> {
const resp = await axios({
method: "delete", method: "delete",
url: `${BASE_URL}/queues/${qname}/archived_tasks:delete_all`, url: `${BASE_URL}/queues/${qname}/archived_tasks:delete_all`,
}); });
return resp.data;
} }
export async function batchRunArchivedTasks( export async function batchRunArchivedTasks(

View File

@ -283,6 +283,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - 1,
pending: queueInfo.currentStats.pending - 1, pending: queueInfo.currentStats.pending - 1,
}, },
}; };
@ -299,6 +300,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - 1,
scheduled: queueInfo.currentStats.scheduled - 1, scheduled: queueInfo.currentStats.scheduled - 1,
}, },
}; };
@ -336,6 +338,8 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size:
queueInfo.currentStats.size - action.payload.deleted_keys.length,
pending: pending:
queueInfo.currentStats.pending - queueInfo.currentStats.pending -
action.payload.deleted_keys.length, action.payload.deleted_keys.length,
@ -372,6 +376,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - action.deleted,
pending: 0, pending: 0,
}, },
}; };
@ -430,6 +435,8 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size:
queueInfo.currentStats.size - action.payload.deleted_keys.length,
scheduled: scheduled:
queueInfo.currentStats.scheduled - queueInfo.currentStats.scheduled -
action.payload.deleted_keys.length, action.payload.deleted_keys.length,
@ -485,6 +492,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - action.deleted,
scheduled: 0, scheduled: 0,
}, },
}; };
@ -501,6 +509,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - 1,
retry: queueInfo.currentStats.retry - 1, retry: queueInfo.currentStats.retry - 1,
}, },
}; };
@ -558,6 +567,8 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size:
queueInfo.currentStats.size - action.payload.deleted_keys.length,
retry: retry:
queueInfo.currentStats.retry - action.payload.deleted_keys.length, queueInfo.currentStats.retry - action.payload.deleted_keys.length,
}, },
@ -611,6 +622,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - action.deleted,
retry: 0, retry: 0,
}, },
}; };
@ -627,6 +639,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - 1,
archived: queueInfo.currentStats.archived - 1, archived: queueInfo.currentStats.archived - 1,
}, },
}; };
@ -664,6 +677,8 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size:
queueInfo.currentStats.size - action.payload.deleted_keys.length,
archived: archived:
queueInfo.currentStats.archived - queueInfo.currentStats.archived -
action.payload.deleted_keys.length, action.payload.deleted_keys.length,
@ -700,6 +715,7 @@ function queuesReducer(
...queueInfo, ...queueInfo,
currentStats: { currentStats: {
...queueInfo.currentStats, ...queueInfo.currentStats,
size: queueInfo.currentStats.size - action.deleted,
archived: 0, archived: 0,
}, },
}; };