diff --git a/metrics_handler.go b/metrics_handler.go index 1a56d4d..f05e52f 100644 --- a/metrics_handler.go +++ b/metrics_handler.go @@ -29,6 +29,10 @@ type metricsFetchOptions struct { // Specifies the end time when fetching metrics. endTime time.Time + + // Optional filter to speicify a list of queues to get metrics for. + // Empty list indicates no filter (i.e. get metrics for all queues). + queues []string } func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.HandlerFunc { @@ -40,16 +44,17 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H } // List of PromQLs. + // Strings are used as template to optionally insert queue filter specified by QUEUE_FILTER. const ( - promQLQueueSize = "asynq_queue_size" - promQLQueueLatency = "asynq_queue_latency_seconds" - promQLMemUsage = "asynq_queue_memory_usage_approx_bytes" - promQLProcessedTasks = "rate(asynq_tasks_processed_total[5m])" - promQLFailedTasks = "rate(asynq_tasks_failed_total[5m])" - promQLErrorRate = "rate(asynq_tasks_failed_total[5m]) / rate(asynq_tasks_processed_total[5m])" - promQLPendingTasks = "asynq_tasks_enqueued_total{state=\"pending\"}" - promQLRetryTasks = "asynq_tasks_enqueued_total{state=\"retry\"}" - promQLArchivedTasks = "asynq_tasks_enqueued_total{state=\"archived\"}" + promQLQueueSize = "asynq_queue_size{QUEUE_FILTER}" + promQLQueueLatency = "asynq_queue_latency_seconds{QUEUE_FILTER}" + promQLMemUsage = "asynq_queue_memory_usage_approx_bytes{QUEUE_FILTER}" + promQLProcessedTasks = "rate(asynq_tasks_processed_total{QUEUE_FILTER}[5m])" + promQLFailedTasks = "rate(asynq_tasks_failed_total{QUEUE_FILTER}[5m])" + promQLErrorRate = "rate(asynq_tasks_failed_total{QUEUE_FILTER}[5m]) / rate(asynq_tasks_processed_total{QUEUE_FILTER}[5m])" + promQLPendingTasks = "asynq_tasks_enqueued_total{state=\"pending\",QUEUE_FILTER}" + promQLRetryTasks = "asynq_tasks_enqueued_total{state=\"retry\",QUEUE_FILTER}" + promQLArchivedTasks = "asynq_tasks_enqueued_total{state=\"archived\",QUEUE_FILTER}" ) // Optional query params: @@ -148,6 +153,9 @@ func extractMetricsFetchOptions(r *http.Request) (*metricsFetchOptions, error) { } opts.endTime = time.Unix(int64(val), 0) } + if qs := q.Get("queues"); qs != "" { + opts.queues = strings.Split(qs, ",") + } return opts, nil } @@ -156,7 +164,7 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri b.WriteString(strings.TrimSuffix(baseAddr, "/")) b.WriteString(prometheusAPIPath) v := url.Values{} - v.Add("query", promQL) + v.Add("query", applyQueueFilter(promQL, opts.queues)) v.Add("start", unixTimeString(opts.endTime.Add(-opts.duration))) v.Add("end", unixTimeString(opts.endTime)) v.Add("step", strconv.Itoa(int(step(opts).Seconds()))) @@ -165,6 +173,22 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri return b.String() } +func applyQueueFilter(promQL string, qnames []string) string { + if len(qnames) == 0 { + return strings.ReplaceAll(promQL, "QUEUE_FILTER", "") + } + var b strings.Builder + b.WriteString(`queue=~"`) + for i, q := range qnames { + if i != 0 { + b.WriteString("|") + } + b.WriteString(q) + } + b.WriteByte('"') + return strings.ReplaceAll(promQL, "QUEUE_FILTER", b.String()) +} + func fetchPrometheusMetrics(client *http.Client, url string) (*json.RawMessage, error) { resp, err := client.Get(url) if err != nil {