From d9a3e414b8f72faee0a58aa378723dab0b1ad081 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 14 Dec 2020 06:54:02 -0800 Subject: [PATCH] Add API endpoints to run scheduled, retry, dead task --- main.go | 3 +++ task_handlers.go | 21 +++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/main.go b/main.go index 562eeb9..b152848 100644 --- a/main.go +++ b/main.go @@ -82,13 +82,16 @@ func main() { api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks:batch_delete", newBatchDeleteDeadTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") // Scheduler Entry endpoints. api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") diff --git a/task_handlers.go b/task_handlers.go index 4d3f2de..29ce538 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -185,6 +185,10 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, key := vars["qname"], vars["task_key"] + if qname == "" || key == "" { + http.Error(w, "route parameters should not be empty", http.StatusBadRequest) + return + } if err := inspector.DeleteTaskByKey(qname, key); err != nil { // TODO: Handle task not found error and return 404 http.Error(w, err.Error(), http.StatusInternalServerError) @@ -194,6 +198,23 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { } } +func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname, key := vars["qname"], vars["task_key"] + if qname == "" || key == "" { + http.Error(w, "route parameters should not be empty", http.StatusBadRequest) + return + } + if err := inspector.RunTaskByKey(qname, key); err != nil { + // TODO: Handle task not found error and return 404 + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"]