Add API endpoints to kill scheduled and retry tasks

This commit is contained in:
Ken Hibino 2020-12-19 06:51:46 -08:00
parent a454f2f094
commit b387546fa8
2 changed files with 89 additions and 0 deletions

View File

@ -86,6 +86,9 @@ func main() {
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:run_all", newRunAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:kill", newKillTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:kill_all", newKillAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_kill", newBatchKillTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
@ -93,6 +96,9 @@ func main() {
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:run_all", newRunAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:kill", newKillTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:kill_all", newKillAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_kill", newBatchKillTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE")

View File

@ -215,6 +215,23 @@ func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
func newKillTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"]
if qname == "" || key == "" {
http.Error(w, "route parameters should not be empty", http.StatusBadRequest)
return
}
if err := inspector.KillTaskByKey(qname, key); err != nil {
// TODO: Handle task not found error and return 404
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
@ -281,6 +298,28 @@ func newRunAllDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
}
}
func newKillAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.KillAllScheduledTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
func newKillAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.KillAllRetryTasks(qname); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
}
// request body used for all batch delete tasks endpoints.
type batchDeleteTasksRequest struct {
TaskKeys []string `json:"task_keys"`
@ -379,6 +418,50 @@ func newBatchRunTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
type batchKillTasksRequest struct {
TaskKeys []string `json:"task_keys"`
}
type batchKillTasksResponse struct {
// task keys that were successfully moved to the dead state.
DeadKeys []string `json:"dead_keys"`
// task keys that were not able to move to the dead state.
ErrorKeys []string `json:"error_keys"`
}
func newBatchKillTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
dec := json.NewDecoder(r.Body)
dec.DisallowUnknownFields()
var req batchKillTasksRequest
if err := dec.Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
qname := mux.Vars(r)["qname"]
resp := batchKillTasksResponse{
// avoid null in the json response
DeadKeys: make([]string, 0),
ErrorKeys: make([]string, 0),
}
for _, key := range req.TaskKeys {
if err := inspector.KillTaskByKey(qname, key); err != nil {
log.Printf("error: could not kill task with key %q: %v", key, err)
resp.ErrorKeys = append(resp.ErrorKeys, key)
} else {
resp.DeadKeys = append(resp.DeadKeys, key)
}
}
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// getPageOptions read page size and number from the request url if set,
// otherwise it returns the default value.
func getPageOptions(r *http.Request) (pageSize, pageNum int) {