From f98c362925fd0546442bef2f20b8fdd72d182944 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 20 Jan 2021 21:30:27 -0800 Subject: [PATCH] Add API endpoints to delete and archive pending tasks --- go.mod | 2 ++ main.go | 10 ++++++++++ task_handlers.go | 22 ++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/go.mod b/go.mod index 638eaab..558dc42 100644 --- a/go.mod +++ b/go.mod @@ -8,3 +8,5 @@ require ( github.com/hibiken/asynq v0.14.0 github.com/rs/cors v1.7.0 ) + +replace github.com/hibiken/asynq => ../../../database/Redis/go/asynq diff --git a/main.go b/main.go index 3b65575..3d968b7 100644 --- a/main.go +++ b/main.go @@ -130,7 +130,15 @@ func main() { api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/pending_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/pending_tasks/{task_key}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") @@ -141,6 +149,7 @@ func main() { api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") @@ -151,6 +160,7 @@ func main() { api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") diff --git a/task_handlers.go b/task_handlers.go index d54d0a6..abe614f 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -299,6 +299,17 @@ func newArchiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { } } +func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + qname := mux.Vars(r)["qname"] + if _, err := inspector.DeleteAllPendingTasks(qname); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] @@ -365,6 +376,17 @@ func newRunAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF } } +func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + qname := mux.Vars(r)["qname"] + if _, err := inspector.ArchiveAllPendingTasks(qname); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) + } +} + func newArchiveAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"]