diff --git a/metrics_handler.go b/metrics_handler.go index 5304506..aa14b75 100644 --- a/metrics_handler.go +++ b/metrics_handler.go @@ -1,6 +1,7 @@ package asynqmon import ( + "encoding/json" "fmt" "io" "net/http" @@ -11,7 +12,7 @@ import ( ) type getMetricsResponse struct { - // TODO + QueueSize *json.RawMessage `json:"queue_size"` } type metricsFetchOptions struct { @@ -32,16 +33,31 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H http.Error(w, fmt.Sprintf("invalid query parameter: %v", err), http.StatusBadRequest) return } - url := buildPrometheusURL(prometheusAddr, "asynq_queue_size", opts) - fmt.Printf("DEBUG: url: %s\n", url) - resp, err := client.Get(url) + // List of quries to send to prometheus server. + queries := []string{ + "asynq_queue_size", + } + resp := getMetricsResponse{} + for _, q := range queries { + url := buildPrometheusURL(prometheusAddr, q, opts) + fmt.Printf("DEBUG: url: %s\n", url) + msg, err := fetchPrometheusMetrics(client, url) + if err != nil { + http.Error(w, fmt.Sprintf("failed to fetch %q: %v", url, err), http.StatusInternalServerError) + return + } + switch q { + case "asynq_queue_size": + resp.QueueSize = msg + } + } + bytes, err := json.Marshal(resp) if err != nil { - http.Error(w, fmt.Sprintf("request failed: GET %q: %v", url, err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("failed to marshal response into JSON: %v", err), http.StatusInternalServerError) return } - defer resp.Body.Close() - if _, err := io.Copy(w, resp.Body); err != nil { - http.Error(w, fmt.Sprintf("failed to copy: %v", err), http.StatusInternalServerError) + if _, err := w.Write(bytes); err != nil { + http.Error(w, fmt.Sprintf("failed to write to response: %v", err), http.StatusInternalServerError) return } } @@ -86,6 +102,20 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri return b.String() } +func fetchPrometheusMetrics(client *http.Client, url string) (*json.RawMessage, error) { + resp, err := client.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + msg := json.RawMessage(bytes) + return &msg, err +} + // Returns step to use given the fetch options. // In general, the longer the duration, longer the each step. func step(opts *metricsFetchOptions) time.Duration {