diff --git a/metrics_handler.go b/metrics_handler.go index aa14b75..0b5ffa3 100644 --- a/metrics_handler.go +++ b/metrics_handler.go @@ -12,7 +12,10 @@ import ( ) type getMetricsResponse struct { - QueueSize *json.RawMessage `json:"queue_size"` + QueueSize *json.RawMessage `json:"queue_size"` + PendingTasksByQueue *json.RawMessage `json:"pending_tasks_by_queue"` + RetryTasksByQueue *json.RawMessage `json:"retry_tasks_by_queue"` + ArchivedTasksByQueue *json.RawMessage `json:"archived_tasks_by_queue"` } type metricsFetchOptions struct { @@ -24,6 +27,13 @@ type metricsFetchOptions struct { } func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.HandlerFunc { + // res is the result of calling a JSON API endpoint. + type res struct { + query string + msg *json.RawMessage + err error + } + // Optional query params: // `duration_sec`: specifies the number of seconds to scan // `end_time`: specifies the end_time in Unix time seconds @@ -33,22 +43,43 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H http.Error(w, fmt.Sprintf("invalid query parameter: %v", err), http.StatusBadRequest) return } - // List of quries to send to prometheus server. + // List of queries (i.e. promQL) to send to prometheus server. queries := []string{ "asynq_queue_size", + "asynq_tasks_enqueued_total{state=\"pending\"}", + "asynq_tasks_enqueued_total{state=\"retry\"}", + "asynq_tasks_enqueued_total{state=\"archived\"}", } resp := getMetricsResponse{} + // Make multiple API calls concurrently + n := len(queries) + ch := make(chan res, len(queries)) for _, q := range queries { - url := buildPrometheusURL(prometheusAddr, q, opts) - fmt.Printf("DEBUG: url: %s\n", url) - msg, err := fetchPrometheusMetrics(client, url) - if err != nil { - http.Error(w, fmt.Sprintf("failed to fetch %q: %v", url, err), http.StatusInternalServerError) + go func(q string) { + url := buildPrometheusURL(prometheusAddr, q, opts) + fmt.Printf("DEBUG: url: %s\n", url) // TODO: Delete this once debugging is done + msg, err := fetchPrometheusMetrics(client, url) + ch <- res{q, msg, err} + }(q) + } + for r := range ch { + n-- + if r.err != nil { + http.Error(w, fmt.Sprintf("failed to fetch %q: %v", r.query, err), http.StatusInternalServerError) return } - switch q { + switch r.query { case "asynq_queue_size": - resp.QueueSize = msg + resp.QueueSize = r.msg + case "asynq_tasks_enqueued_total{state=\"pending\"}": + resp.PendingTasksByQueue = r.msg + case "asynq_tasks_enqueued_total{state=\"retry\"}": + resp.RetryTasksByQueue = r.msg + case "asynq_tasks_enqueued_total{state=\"archived\"}": + resp.ArchivedTasksByQueue = r.msg + } + if n == 0 { + break // fetched all metrics } } bytes, err := json.Marshal(resp)