From a488599ec0522c2a735c42645fc1c00cc91a4f24 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 26 Jan 2021 21:43:51 -0800 Subject: [PATCH] Fix queue size when delete action happens --- task_handlers.go | 41 +++++++++++++++++++++++++------- ui/src/actions/tasksActions.ts | 36 +++++++++++++++++++++------- ui/src/api.ts | 32 ++++++++++++++++++------- ui/src/reducers/queuesReducer.ts | 16 +++++++++++++ 4 files changed, 101 insertions(+), 24 deletions(-) diff --git a/task_handlers.go b/task_handlers.go index 18525c4..a4642ea 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -326,47 +326,72 @@ func newArchiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { } } +type DeleteAllTasksResponse struct { + // Number of tasks deleted. + Deleted int `json:"deleted"` +} + func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.DeleteAllPendingTasks(qname); err != nil { + n, err := inspector.DeleteAllPendingTasks(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := DeleteAllTasksResponse{n} + if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) } } func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.DeleteAllScheduledTasks(qname); err != nil { + n, err := inspector.DeleteAllScheduledTasks(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := DeleteAllTasksResponse{n} + if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) } } func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.DeleteAllRetryTasks(qname); err != nil { + n, err := inspector.DeleteAllRetryTasks(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := DeleteAllTasksResponse{n} + if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) } } func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.DeleteAllArchivedTasks(qname); err != nil { + n, err := inspector.DeleteAllArchivedTasks(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := DeleteAllTasksResponse{n} + if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) } } diff --git a/ui/src/actions/tasksActions.ts b/ui/src/actions/tasksActions.ts index be155b3..e2c0b8f 100644 --- a/ui/src/actions/tasksActions.ts +++ b/ui/src/actions/tasksActions.ts @@ -394,6 +394,7 @@ interface DeleteAllPendingTasksBeginAction { interface DeleteAllPendingTasksSuccessAction { type: typeof DELETE_ALL_PENDING_TASKS_SUCCESS; queue: string; + deleted: number; } interface DeleteAllPendingTasksErrorAction { @@ -667,6 +668,7 @@ interface DeleteAllScheduledTasksBeginAction { interface DeleteAllScheduledTasksSuccessAction { type: typeof DELETE_ALL_SCHEDULED_TASKS_SUCCESS; queue: string; + deleted: number; } interface DeleteAllScheduledTasksErrorAction { @@ -791,6 +793,7 @@ interface DeleteAllRetryTasksBeginAction { interface DeleteAllRetryTasksSuccessAction { type: typeof DELETE_ALL_RETRY_TASKS_SUCCESS; queue: string; + deleted: number; } interface DeleteAllRetryTasksErrorAction { @@ -880,6 +883,7 @@ interface DeleteAllArchivedTasksBeginAction { interface DeleteAllArchivedTasksSuccessAction { type: typeof DELETE_ALL_ARCHIVED_TASKS_SUCCESS; queue: string; + deleted: number; } interface DeleteAllArchivedTasksErrorAction { @@ -1532,8 +1536,12 @@ export function deleteAllPendingTasksAsync(queue: string) { return async (dispatch: Dispatch) => { dispatch({ type: DELETE_ALL_PENDING_TASKS_BEGIN, queue }); try { - await deleteAllPendingTasks(queue); - dispatch({ type: DELETE_ALL_PENDING_TASKS_SUCCESS, queue }); + const response = await deleteAllPendingTasks(queue); + dispatch({ + type: DELETE_ALL_PENDING_TASKS_SUCCESS, + deleted: response.deleted, + queue, + }); } catch (error) { console.error( "deleteAllPendingTasksAsync: ", @@ -1552,8 +1560,12 @@ export function deleteAllScheduledTasksAsync(queue: string) { return async (dispatch: Dispatch) => { dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_BEGIN, queue }); try { - await deleteAllScheduledTasks(queue); - dispatch({ type: DELETE_ALL_SCHEDULED_TASKS_SUCCESS, queue }); + const response = await deleteAllScheduledTasks(queue); + dispatch({ + type: DELETE_ALL_SCHEDULED_TASKS_SUCCESS, + deleted: response.deleted, + queue, + }); } catch (error) { console.error( "deleteAllScheduledTasksAsync: ", @@ -1708,8 +1720,12 @@ export function deleteAllRetryTasksAsync(queue: string) { return async (dispatch: Dispatch) => { dispatch({ type: DELETE_ALL_RETRY_TASKS_BEGIN, queue }); try { - await deleteAllRetryTasks(queue); - dispatch({ type: DELETE_ALL_RETRY_TASKS_SUCCESS, queue }); + const response = await deleteAllRetryTasks(queue); + dispatch({ + type: DELETE_ALL_RETRY_TASKS_SUCCESS, + deleted: response.deleted, + queue, + }); } catch (error) { console.error( "deleteAllRetryTasksAsync: ", @@ -1842,8 +1858,12 @@ export function deleteAllArchivedTasksAsync(queue: string) { return async (dispatch: Dispatch) => { dispatch({ type: DELETE_ALL_ARCHIVED_TASKS_BEGIN, queue }); try { - await deleteAllArchivedTasks(queue); - dispatch({ type: DELETE_ALL_ARCHIVED_TASKS_SUCCESS, queue }); + const response = await deleteAllArchivedTasks(queue); + dispatch({ + type: DELETE_ALL_ARCHIVED_TASKS_SUCCESS, + deleted: response.deleted, + queue, + }); } catch (error) { console.error( "deleteAllArchivedTasksAsync: ", diff --git a/ui/src/api.ts b/ui/src/api.ts index 70cc30f..9e78853 100644 --- a/ui/src/api.ts +++ b/ui/src/api.ts @@ -68,6 +68,10 @@ export interface BatchArchiveTasksResponse { error_keys: string[]; } +export interface DeleteAllTasksResponse { + deleted: number; +} + export interface ListQueueStatsResponse { stats: { [qname: string]: DailyStat[] }; } @@ -517,11 +521,14 @@ export async function batchDeletePendingTasks( return resp.data; } -export async function deleteAllPendingTasks(qname: string): Promise { - await axios({ +export async function deleteAllPendingTasks( + qname: string +): Promise { + const resp = await axios({ method: "delete", url: `${BASE_URL}/queues/${qname}/pending_tasks:delete_all`, }); + return resp.data; } export async function runScheduledTask( @@ -568,11 +575,14 @@ export async function batchDeleteScheduledTasks( return resp.data; } -export async function deleteAllScheduledTasks(qname: string): Promise { - await axios({ +export async function deleteAllScheduledTasks( + qname: string +): Promise { + const resp = await axios({ method: "delete", url: `${BASE_URL}/queues/${qname}/scheduled_tasks:delete_all`, }); + return resp.data; } export async function batchRunScheduledTasks( @@ -661,11 +671,14 @@ export async function batchDeleteRetryTasks( return resp.data; } -export async function deleteAllRetryTasks(qname: string): Promise { - await axios({ +export async function deleteAllRetryTasks( + qname: string +): Promise { + const resp = await axios({ method: "delete", url: `${BASE_URL}/queues/${qname}/retry_tasks:delete_all`, }); + return resp.data; } export async function batchRunRetryTasks( @@ -744,11 +757,14 @@ export async function batchDeleteArchivedTasks( return resp.data; } -export async function deleteAllArchivedTasks(qname: string): Promise { - await axios({ +export async function deleteAllArchivedTasks( + qname: string +): Promise { + const resp = await axios({ method: "delete", url: `${BASE_URL}/queues/${qname}/archived_tasks:delete_all`, }); + return resp.data; } export async function batchRunArchivedTasks( diff --git a/ui/src/reducers/queuesReducer.ts b/ui/src/reducers/queuesReducer.ts index 10c4ec8..8c0b439 100644 --- a/ui/src/reducers/queuesReducer.ts +++ b/ui/src/reducers/queuesReducer.ts @@ -283,6 +283,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - 1, pending: queueInfo.currentStats.pending - 1, }, }; @@ -299,6 +300,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - 1, scheduled: queueInfo.currentStats.scheduled - 1, }, }; @@ -336,6 +338,8 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: + queueInfo.currentStats.size - action.payload.deleted_keys.length, pending: queueInfo.currentStats.pending - action.payload.deleted_keys.length, @@ -372,6 +376,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - action.deleted, pending: 0, }, }; @@ -430,6 +435,8 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: + queueInfo.currentStats.size - action.payload.deleted_keys.length, scheduled: queueInfo.currentStats.scheduled - action.payload.deleted_keys.length, @@ -485,6 +492,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - action.deleted, scheduled: 0, }, }; @@ -501,6 +509,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - 1, retry: queueInfo.currentStats.retry - 1, }, }; @@ -558,6 +567,8 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: + queueInfo.currentStats.size - action.payload.deleted_keys.length, retry: queueInfo.currentStats.retry - action.payload.deleted_keys.length, }, @@ -611,6 +622,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - action.deleted, retry: 0, }, }; @@ -627,6 +639,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - 1, archived: queueInfo.currentStats.archived - 1, }, }; @@ -664,6 +677,8 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: + queueInfo.currentStats.size - action.payload.deleted_keys.length, archived: queueInfo.currentStats.archived - action.payload.deleted_keys.length, @@ -700,6 +715,7 @@ function queuesReducer( ...queueInfo, currentStats: { ...queueInfo.currentStats, + size: queueInfo.currentStats.size - action.deleted, archived: 0, }, };