package main import ( "encoding/json" "log" "net/http" "strconv" "github.com/gorilla/mux" "github.com/hibiken/asynq" ) // **************************************************************************** // This file defines: // - http.Handler(s) for task related endpoints // **************************************************************************** func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) tasks, err := inspector.ListActiveTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stats, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ActiveTask, 0) } else { payload["tasks"] = toActiveTasks(tasks) } payload["stats"] = toQueueStateSnapshot(stats) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func newCancelActiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["task_id"] if err := inspector.CancelActiveTask(id); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) tasks, err := inspector.ListPendingTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stats, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*PendingTask, 0) } else { payload["tasks"] = toPendingTasks(tasks) } payload["stats"] = toQueueStateSnapshot(stats) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) tasks, err := inspector.ListScheduledTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stats, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ScheduledTask, 0) } else { payload["tasks"] = toScheduledTasks(tasks) } payload["stats"] = toQueueStateSnapshot(stats) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) tasks, err := inspector.ListRetryTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stats, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*RetryTask, 0) } else { payload["tasks"] = toRetryTasks(tasks) } payload["stats"] = toQueueStateSnapshot(stats) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func newListDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] pageSize, pageNum := getPageOptions(r) tasks, err := inspector.ListDeadTasks( qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } stats, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*DeadTask, 0) } else { payload["tasks"] = toDeadTasks(tasks) } payload["stats"] = toQueueStateSnapshot(stats) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, key := vars["qname"], vars["task_key"] if qname == "" || key == "" { http.Error(w, "route parameters should not be empty", http.StatusBadRequest) return } if err := inspector.DeleteTaskByKey(qname, key); err != nil { // TODO: Handle task not found error and return 404 http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, key := vars["qname"], vars["task_key"] if qname == "" || key == "" { http.Error(w, "route parameters should not be empty", http.StatusBadRequest) return } if err := inspector.RunTaskByKey(qname, key); err != nil { // TODO: Handle task not found error and return 404 http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] if _, err := inspector.DeleteAllScheduledTasks(qname); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] if _, err := inspector.DeleteAllRetryTasks(qname); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newDeleteAllDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] if _, err := inspector.DeleteAllDeadTasks(qname); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } // request body used for all batch delete tasks endpoints. type batchDeleteTasksRequest struct { TaskKeys []string `json:"task_keys"` } // Note: Redis does not have any rollback mechanism, so it's possible // to have partial success when doing a batch operation. // For this reason this response contains a list of succeeded keys // and a list of failed keys. type batchDeleteTasksResponse struct { // task keys that were successfully deleted. DeletedKeys []string `json:"deleted_keys"` // task keys that were not deleted. FailedKeys []string `json:"failed_keys"` } // Maximum request body size in bytes. // Allow up to 1MB in size. const maxRequestBodySize = 1000000 func newBatchDeleteDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() var req batchDeleteTasksRequest if err := dec.Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } qname := mux.Vars(r)["qname"] resp := batchDeleteTasksResponse{ // avoid null in the json response DeletedKeys: make([]string, 0), FailedKeys: make([]string, 0), } for _, key := range req.TaskKeys { if err := inspector.DeleteTaskByKey(qname, key); err != nil { log.Printf("error: could not delete task with key %q: %v", key, err) resp.FailedKeys = append(resp.FailedKeys, key) } else { resp.DeletedKeys = append(resp.DeletedKeys, key) } } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } type batchRunTasksRequest struct { TaskKeys []string `json:"task_keys"` } type batchRunTasksResponse struct { // task keys that were successfully moved to the pending state. PendingKeys []string `json:"pending_keys"` // task keys that were not able to move to the pending state. ErrorKeys []string `json:"error_keys"` } func newBatchRunTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) dec := json.NewDecoder(r.Body) dec.DisallowUnknownFields() var req batchRunTasksRequest if err := dec.Decode(&req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } qname := mux.Vars(r)["qname"] resp := batchRunTasksResponse{ // avoid null in the json response PendingKeys: make([]string, 0), ErrorKeys: make([]string, 0), } for _, key := range req.TaskKeys { if err := inspector.RunTaskByKey(qname, key); err != nil { log.Printf("error: could not run task with key %q: %v", key, err) resp.ErrorKeys = append(resp.ErrorKeys, key) } else { resp.PendingKeys = append(resp.PendingKeys, key) } } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } } // getPageOptions read page size and number from the request url if set, // otherwise it returns the default value. func getPageOptions(r *http.Request) (pageSize, pageNum int) { pageSize = 20 // default page size pageNum = 1 // default page num q := r.URL.Query() if s := q.Get("size"); s != "" { if n, err := strconv.Atoi(s); err == nil { pageSize = n } } if s := q.Get("page"); s != "" { if n, err := strconv.Atoi(s); err == nil { pageNum = n } } return pageSize, pageNum }