From 33b24ca940507e6d9175901af7874bc6376c4fb0 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 24 Mar 2022 06:54:41 -0700 Subject: [PATCH] Add list groups REST endpoint --- conversion_helpers.go | 20 ++++++++++++++++++++ group_handlers.go | 33 +++++++++++++++++++++++++++++++++ handler.go | 3 +++ 3 files changed, 56 insertions(+) create mode 100644 group_handlers.go diff --git a/conversion_helpers.go b/conversion_helpers.go index 9fa78bd..e644a18 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -433,6 +433,26 @@ func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter, rf ResultFormat return out } +type groupInfo struct { + Group string `json:"group"` + Size int `json:"size"` +} + +func toGroupInfos(in []*asynq.GroupInfo) []*groupInfo { + out := make([]*groupInfo, len(in)) + for i, g := range in { + out[i] = toGroupInfo(g) + } + return out +} + +func toGroupInfo(in *asynq.GroupInfo) *groupInfo { + return &groupInfo{ + Group: in.Group, + Size: in.Size, + } +} + type schedulerEntry struct { ID string `json:"id"` Spec string `json:"spec"` diff --git a/group_handlers.go b/group_handlers.go new file mode 100644 index 0000000..6843aaa --- /dev/null +++ b/group_handlers.go @@ -0,0 +1,33 @@ +package asynqmon + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/hibiken/asynq" +) + +type listGroupsResponse struct { + Groups []*groupInfo `json:"groups"` +} + +func newListGroupsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + qname := mux.Vars(r)["qname"] + + groups, err := inspector.Groups(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + resp := listGroupsResponse{ + Groups: toGroupInfos(groups), + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} diff --git a/handler.go b/handler.go index 1405b1a..2a20c15 100644 --- a/handler.go +++ b/handler.go @@ -176,6 +176,9 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") + // Groups endponts + api.HandleFunc("/queues/{qname}/groups", newListGroupsHandlerFunc(inspector)).Methods("GET") + // Servers endpoints. api.HandleFunc("/servers", newListServersHandlerFunc(inspector, payloadFmt)).Methods("GET")