Add API endpoints to run scheduled, retry, dead task

This commit is contained in:
Ken Hibino 2020-12-14 06:54:02 -08:00
parent ef60304015
commit d9a3e414b8
2 changed files with 24 additions and 0 deletions

View File

@ -82,13 +82,16 @@ func main() {
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:batch_delete", newBatchDeleteDeadTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/dead_tasks:batch_delete", newBatchDeleteDeadTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
// Scheduler Entry endpoints. // Scheduler Entry endpoints.
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET")

View File

@ -185,6 +185,10 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"] qname, key := vars["qname"], vars["task_key"]
if qname == "" || key == "" {
http.Error(w, "route parameters should not be empty", http.StatusBadRequest)
return
}
if err := inspector.DeleteTaskByKey(qname, key); err != nil { if err := inspector.DeleteTaskByKey(qname, key); err != nil {
// TODO: Handle task not found error and return 404 // TODO: Handle task not found error and return 404
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -194,6 +198,23 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
} }
} }
func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"]
if qname == "" || key == "" {
http.Error(w, "route parameters should not be empty", http.StatusBadRequest)
return
}
if err := inspector.RunTaskByKey(qname, key); err != nil {
// TODO: Handle task not found error and return 404
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"] qname := mux.Vars(r)["qname"]