mirror of
				https://github.com/hibiken/asynqmon.git
				synced 2025-10-26 08:16:10 +08:00 
			
		
		
		
	(api): Allow filter by queue on metrics endpoint
This commit is contained in:
		| @@ -29,6 +29,10 @@ type metricsFetchOptions struct { | |||||||
|  |  | ||||||
| 	// Specifies the end time when fetching metrics. | 	// Specifies the end time when fetching metrics. | ||||||
| 	endTime time.Time | 	endTime time.Time | ||||||
|  |  | ||||||
|  | 	// Optional filter to speicify a list of queues to get metrics for. | ||||||
|  | 	// Empty list indicates no filter (i.e. get metrics for all queues). | ||||||
|  | 	queues []string | ||||||
| } | } | ||||||
|  |  | ||||||
| func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.HandlerFunc { | func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.HandlerFunc { | ||||||
| @@ -40,16 +44,17 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// List of PromQLs. | 	// List of PromQLs. | ||||||
|  | 	// Strings are used as template to optionally insert queue filter specified by QUEUE_FILTER. | ||||||
| 	const ( | 	const ( | ||||||
| 		promQLQueueSize      = "asynq_queue_size" | 		promQLQueueSize      = "asynq_queue_size{QUEUE_FILTER}" | ||||||
| 		promQLQueueLatency   = "asynq_queue_latency_seconds" | 		promQLQueueLatency   = "asynq_queue_latency_seconds{QUEUE_FILTER}" | ||||||
| 		promQLMemUsage       = "asynq_queue_memory_usage_approx_bytes" | 		promQLMemUsage       = "asynq_queue_memory_usage_approx_bytes{QUEUE_FILTER}" | ||||||
| 		promQLProcessedTasks = "rate(asynq_tasks_processed_total[5m])" | 		promQLProcessedTasks = "rate(asynq_tasks_processed_total{QUEUE_FILTER}[5m])" | ||||||
| 		promQLFailedTasks    = "rate(asynq_tasks_failed_total[5m])" | 		promQLFailedTasks    = "rate(asynq_tasks_failed_total{QUEUE_FILTER}[5m])" | ||||||
| 		promQLErrorRate      = "rate(asynq_tasks_failed_total[5m]) / rate(asynq_tasks_processed_total[5m])" | 		promQLErrorRate      = "rate(asynq_tasks_failed_total{QUEUE_FILTER}[5m]) / rate(asynq_tasks_processed_total{QUEUE_FILTER}[5m])" | ||||||
| 		promQLPendingTasks   = "asynq_tasks_enqueued_total{state=\"pending\"}" | 		promQLPendingTasks   = "asynq_tasks_enqueued_total{state=\"pending\",QUEUE_FILTER}" | ||||||
| 		promQLRetryTasks     = "asynq_tasks_enqueued_total{state=\"retry\"}" | 		promQLRetryTasks     = "asynq_tasks_enqueued_total{state=\"retry\",QUEUE_FILTER}" | ||||||
| 		promQLArchivedTasks  = "asynq_tasks_enqueued_total{state=\"archived\"}" | 		promQLArchivedTasks  = "asynq_tasks_enqueued_total{state=\"archived\",QUEUE_FILTER}" | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	// Optional query params: | 	// Optional query params: | ||||||
| @@ -148,6 +153,9 @@ func extractMetricsFetchOptions(r *http.Request) (*metricsFetchOptions, error) { | |||||||
| 		} | 		} | ||||||
| 		opts.endTime = time.Unix(int64(val), 0) | 		opts.endTime = time.Unix(int64(val), 0) | ||||||
| 	} | 	} | ||||||
|  | 	if qs := q.Get("queues"); qs != "" { | ||||||
|  | 		opts.queues = strings.Split(qs, ",") | ||||||
|  | 	} | ||||||
| 	return opts, nil | 	return opts, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -156,7 +164,7 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri | |||||||
| 	b.WriteString(strings.TrimSuffix(baseAddr, "/")) | 	b.WriteString(strings.TrimSuffix(baseAddr, "/")) | ||||||
| 	b.WriteString(prometheusAPIPath) | 	b.WriteString(prometheusAPIPath) | ||||||
| 	v := url.Values{} | 	v := url.Values{} | ||||||
| 	v.Add("query", promQL) | 	v.Add("query", applyQueueFilter(promQL, opts.queues)) | ||||||
| 	v.Add("start", unixTimeString(opts.endTime.Add(-opts.duration))) | 	v.Add("start", unixTimeString(opts.endTime.Add(-opts.duration))) | ||||||
| 	v.Add("end", unixTimeString(opts.endTime)) | 	v.Add("end", unixTimeString(opts.endTime)) | ||||||
| 	v.Add("step", strconv.Itoa(int(step(opts).Seconds()))) | 	v.Add("step", strconv.Itoa(int(step(opts).Seconds()))) | ||||||
| @@ -165,6 +173,22 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri | |||||||
| 	return b.String() | 	return b.String() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func applyQueueFilter(promQL string, qnames []string) string { | ||||||
|  | 	if len(qnames) == 0 { | ||||||
|  | 		return strings.ReplaceAll(promQL, "QUEUE_FILTER", "") | ||||||
|  | 	} | ||||||
|  | 	var b strings.Builder | ||||||
|  | 	b.WriteString(`queue=~"`) | ||||||
|  | 	for i, q := range qnames { | ||||||
|  | 		if i != 0 { | ||||||
|  | 			b.WriteString("|") | ||||||
|  | 		} | ||||||
|  | 		b.WriteString(q) | ||||||
|  | 	} | ||||||
|  | 	b.WriteByte('"') | ||||||
|  | 	return strings.ReplaceAll(promQL, "QUEUE_FILTER", b.String()) | ||||||
|  | } | ||||||
|  |  | ||||||
| func fetchPrometheusMetrics(client *http.Client, url string) (*json.RawMessage, error) { | func fetchPrometheusMetrics(client *http.Client, url string) (*json.RawMessage, error) { | ||||||
| 	resp, err := client.Get(url) | 	resp, err := client.Get(url) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user