mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-10-26 08:16:10 +08:00
(api): Add more metrics to metrics API response
This commit is contained in:
@@ -12,7 +12,10 @@ import (
|
||||
)
|
||||
|
||||
type getMetricsResponse struct {
|
||||
QueueSize *json.RawMessage `json:"queue_size"`
|
||||
QueueSize *json.RawMessage `json:"queue_size"`
|
||||
PendingTasksByQueue *json.RawMessage `json:"pending_tasks_by_queue"`
|
||||
RetryTasksByQueue *json.RawMessage `json:"retry_tasks_by_queue"`
|
||||
ArchivedTasksByQueue *json.RawMessage `json:"archived_tasks_by_queue"`
|
||||
}
|
||||
|
||||
type metricsFetchOptions struct {
|
||||
@@ -24,6 +27,13 @@ type metricsFetchOptions struct {
|
||||
}
|
||||
|
||||
func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.HandlerFunc {
|
||||
// res is the result of calling a JSON API endpoint.
|
||||
type res struct {
|
||||
query string
|
||||
msg *json.RawMessage
|
||||
err error
|
||||
}
|
||||
|
||||
// Optional query params:
|
||||
// `duration_sec`: specifies the number of seconds to scan
|
||||
// `end_time`: specifies the end_time in Unix time seconds
|
||||
@@ -33,22 +43,43 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H
|
||||
http.Error(w, fmt.Sprintf("invalid query parameter: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
// List of quries to send to prometheus server.
|
||||
// List of queries (i.e. promQL) to send to prometheus server.
|
||||
queries := []string{
|
||||
"asynq_queue_size",
|
||||
"asynq_tasks_enqueued_total{state=\"pending\"}",
|
||||
"asynq_tasks_enqueued_total{state=\"retry\"}",
|
||||
"asynq_tasks_enqueued_total{state=\"archived\"}",
|
||||
}
|
||||
resp := getMetricsResponse{}
|
||||
// Make multiple API calls concurrently
|
||||
n := len(queries)
|
||||
ch := make(chan res, len(queries))
|
||||
for _, q := range queries {
|
||||
url := buildPrometheusURL(prometheusAddr, q, opts)
|
||||
fmt.Printf("DEBUG: url: %s\n", url)
|
||||
msg, err := fetchPrometheusMetrics(client, url)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to fetch %q: %v", url, err), http.StatusInternalServerError)
|
||||
go func(q string) {
|
||||
url := buildPrometheusURL(prometheusAddr, q, opts)
|
||||
fmt.Printf("DEBUG: url: %s\n", url) // TODO: Delete this once debugging is done
|
||||
msg, err := fetchPrometheusMetrics(client, url)
|
||||
ch <- res{q, msg, err}
|
||||
}(q)
|
||||
}
|
||||
for r := range ch {
|
||||
n--
|
||||
if r.err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to fetch %q: %v", r.query, err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
switch q {
|
||||
switch r.query {
|
||||
case "asynq_queue_size":
|
||||
resp.QueueSize = msg
|
||||
resp.QueueSize = r.msg
|
||||
case "asynq_tasks_enqueued_total{state=\"pending\"}":
|
||||
resp.PendingTasksByQueue = r.msg
|
||||
case "asynq_tasks_enqueued_total{state=\"retry\"}":
|
||||
resp.RetryTasksByQueue = r.msg
|
||||
case "asynq_tasks_enqueued_total{state=\"archived\"}":
|
||||
resp.ArchivedTasksByQueue = r.msg
|
||||
}
|
||||
if n == 0 {
|
||||
break // fetched all metrics
|
||||
}
|
||||
}
|
||||
bytes, err := json.Marshal(resp)
|
||||
|
||||
Reference in New Issue
Block a user