From cbfc1af9c013b3920a0ef00f73f4b915724ea534 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 23 Dec 2020 06:23:15 -0800 Subject: [PATCH] Add API endpoints to batch cancel active tasks --- main.go | 3 +++ task_handlers.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/main.go b/main.go index e7f323e..14a96a4 100644 --- a/main.go +++ b/main.go @@ -78,6 +78,9 @@ func main() { // Task endpoints. api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/active_tasks/:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") diff --git a/task_handlers.go b/task_handlers.go index 0a7955e..62fad42 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -57,6 +57,73 @@ func newCancelActiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc } } +func newCancelAllActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + const batchSize = 100 + page := 1 + qname := mux.Vars(r)["qname"] + for { + tasks, err := inspector.ListActiveTasks(qname, asynq.Page(page), asynq.PageSize(batchSize)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + for _, t := range tasks { + if err := inspector.CancelActiveTask(t.ID); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + if len(tasks) < batchSize { + break + } + page++ + } + w.WriteHeader(http.StatusNoContent) + } +} + +type batchCancelTasksRequest struct { + TaskIDs []string `json:"task_ids"` +} + +type batchCancelTasksResponse struct { + CanceledIDs []string `json:"canceled_ids"` + ErrorIDs []string `json:"error_ids"` +} + +func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + + var req batchCancelTasksRequest + if err := dec.Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resp := batchCancelTasksResponse{ + // avoid null in the json response + CanceledIDs: make([]string, 0), + ErrorIDs: make([]string, 0), + } + for _, id := range req.TaskIDs { + if err := inspector.CancelActiveTask(id); err != nil { + log.Printf("error: could not send cancelation signal to task %s", id) + resp.ErrorIDs = append(resp.ErrorIDs, id) + } else { + resp.CanceledIDs = append(resp.CanceledIDs, id) + } + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r)