Add API endpoints to delete all scheduled, retry, and dead tasks

This commit is contained in:
Ken Hibino 2020-12-07 07:22:04 -08:00
parent 8912bb2016
commit 0adc52479f
2 changed files with 36 additions and 1 deletions

View File

@ -81,11 +81,13 @@ func main() {
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
// api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE")
// Scheduler Entry endpoints.
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET")

View File

@ -178,6 +178,39 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllScheduledTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllRetryTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newDeleteAllDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.DeleteAllDeadTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
// getPageOptions read page size and number from the request url if set,
// otherwise it returns the default value.
func getPageOptions(r *http.Request) (pageSize, pageNum int) {