From 706e80580d3f65335f96f397ff477cbe57ab6786 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 15 Dec 2020 06:16:58 -0800 Subject: [PATCH] Add API endpoints to batch run tasks --- main.go | 3 +++ task_handlers.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/main.go b/main.go index b152848..40edbff 100644 --- a/main.go +++ b/main.go @@ -83,15 +83,18 @@ func main() { api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks:batch_delete", newBatchDeleteDeadTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/dead_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") // Scheduler Entry endpoints. api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") diff --git a/task_handlers.go b/task_handlers.go index 29ce538..a53fe21 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -302,6 +302,50 @@ func newBatchDeleteDeadTasksHandlerFunc(inspector *asynq.Inspector) http.Handler } } +type batchRunTasksRequest struct { + TaskKeys []string `json:"task_keys"` +} + +type batchRunTasksResponse struct { + // task keys that were successfully moved to the pending state. + PendingKeys []string `json:"pending_keys"` + // task keys that were not able to move to the pending state. + ErrorKeys []string `json:"error_keys"` +} + +func newBatchRunTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + + var req batchRunTasksRequest + if err := dec.Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + qname := mux.Vars(r)["qname"] + resp := batchRunTasksResponse{ + // avoid null in the json response + PendingKeys: make([]string, 0), + ErrorKeys: make([]string, 0), + } + for _, key := range req.TaskKeys { + if err := inspector.RunTaskByKey(qname, key); err != nil { + log.Printf("error: could not run task with key %q: %v", key, err) + resp.ErrorKeys = append(resp.ErrorKeys, key) + } else { + resp.PendingKeys = append(resp.PendingKeys, key) + } + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + // getPageOptions read page size and number from the request url if set, // otherwise it returns the default value. func getPageOptions(r *http.Request) (pageSize, pageNum int) {