diff --git a/task_handlers.go b/task_handlers.go index f9fddf7..8decd3f 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -71,10 +71,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatt Tasks: activeTasks, Stats: toQueueStateSnapshot(qinfo), } - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, resp) } } @@ -149,10 +146,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl resp.CanceledIDs = append(resp.CanceledIDs, id) } } - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, resp) } } @@ -180,10 +174,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormat payload["tasks"] = toPendingTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -211,10 +202,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm payload["tasks"] = toScheduledTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -242,10 +230,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatte payload["tasks"] = toRetryTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -273,10 +258,7 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForma payload["tasks"] = toArchivedTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -303,10 +285,7 @@ func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm payload["tasks"] = toCompletedTasks(tasks, pf, rf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -335,10 +314,7 @@ func newListAggregatingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFo payload["tasks"] = toAggregatingTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) - if err := json.NewEncoder(w).Encode(payload); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, payload) } } @@ -406,11 +382,7 @@ func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handle http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } @@ -423,11 +395,7 @@ func newDeleteAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.Ha http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } @@ -439,11 +407,7 @@ func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.Hand http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } @@ -455,11 +419,7 @@ func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } @@ -471,11 +431,7 @@ func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.Handl http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } @@ -487,44 +443,48 @@ func newDeleteAllCompletedTasksHandlerFunc(inspector *asynq.Inspector) http.Hand http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp := deleteAllTasksResponse{n} - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, deleteAllTasksResponse{n}) } } +type runAllTasksResponse struct { + // Number of tasks scheduled to run. + Scheduled int `json:"scheduled"` +} + func newRunAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.RunAllScheduledTasks(qname); err != nil { + n, err := inspector.RunAllScheduledTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, runAllTasksResponse{n}) } } func newRunAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.RunAllRetryTasks(qname); err != nil { + n, err := inspector.RunAllRetryTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, runAllTasksResponse{n}) } } func newRunAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.RunAllArchivedTasks(qname); err != nil { + n, err := inspector.RunAllArchivedTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, runAllTasksResponse{n}) } } @@ -532,22 +492,35 @@ func newRunAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.Handl return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, gname := vars["qname"], vars["gname"] - if _, err := inspector.RunAllAggregatingTasks(qname, gname); err != nil { + n, err := inspector.RunAllAggregatingTasks(qname, gname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, runAllTasksResponse{n}) + } +} + +type archiveAllTasksResponse struct { + // Number of tasks archived. + Archived int `json:"archived"` +} + +func writeResponseJSON(w http.ResponseWriter, resp interface{}) { + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) } } func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.ArchiveAllPendingTasks(qname); err != nil { + n, err := inspector.ArchiveAllPendingTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, archiveAllTasksResponse{n}) } } @@ -555,33 +528,36 @@ func newArchiveAllAggregatingTasksHandlerFunc(inspector *asynq.Inspector) http.H return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, gname := vars["qname"], vars["gname"] - if _, err := inspector.ArchiveAllAggregatingTasks(qname, gname); err != nil { + n, err := inspector.ArchiveAllAggregatingTasks(qname, gname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, archiveAllTasksResponse{n}) } } func newArchiveAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.ArchiveAllScheduledTasks(qname); err != nil { + n, err := inspector.ArchiveAllScheduledTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, archiveAllTasksResponse{n}) } } func newArchiveAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qname := mux.Vars(r)["qname"] - if _, err := inspector.ArchiveAllRetryTasks(qname); err != nil { + n, err := inspector.ArchiveAllRetryTasks(qname) + if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - w.WriteHeader(http.StatusNoContent) + writeResponseJSON(w, archiveAllTasksResponse{n}) } } @@ -632,10 +608,7 @@ func newBatchDeleteTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc resp.DeletedIDs = append(resp.DeletedIDs, taskid) } } - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, resp) } } @@ -676,10 +649,7 @@ func newBatchRunTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { resp.PendingIDs = append(resp.PendingIDs, taskid) } } - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, resp) } } @@ -720,10 +690,7 @@ func newBatchArchiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFun resp.ArchivedIDs = append(resp.ArchivedIDs, taskid) } } - if err := json.NewEncoder(w).Encode(resp); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, resp) } } @@ -769,9 +736,6 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter, rf R return } - if err := json.NewEncoder(w).Encode(toTaskInfo(info, pf, rf)); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + writeResponseJSON(w, toTaskInfo(info, pf, rf)) } }