From 28b1d463d000e5da9843b9358fd3a2621aa7f0a4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 30 Mar 2022 06:18:58 -0700 Subject: [PATCH] Add REST endpoints for actions on aggregating_tasks --- handler.go | 9 +++++++++ task_handlers.go | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/handler.go b/handler.go index 682419a..c1fadeb 100644 --- a/handler.go +++ b/handler.go @@ -175,6 +175,15 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks", newListAggregatingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:delete_all", newDeleteAllAggregatingTasksHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:run_all", newRunAllAggregatingTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:archive_all", newArchiveAllAggregatingTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") diff --git a/task_handlers.go b/task_handlers.go index c309854..f9fddf7 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -414,6 +414,23 @@ func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handle } } +func newDeleteAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname, gname := vars["qname"], vars["gname"] + n, err := inspector.DeleteAllAggregatingTasks(qname, gname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := deleteAllTasksResponse{n} + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] @@ -511,6 +528,18 @@ func newRunAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF } } +func newRunAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname, gname := vars["qname"], vars["gname"] + if _, err := inspector.RunAllAggregatingTasks(qname, gname); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] @@ -522,6 +551,18 @@ func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handl } } +func newArchiveAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname, gname := vars["qname"], vars["gname"] + if _, err := inspector.ArchiveAllAggregatingTasks(qname, gname); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func newArchiveAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"]