mirror of
				https://github.com/hibiken/asynqmon.git
				synced 2025-10-26 00:06:13 +08:00 
			
		
		
		
	(api): Update metrics endpoint to return multiple metrics
This commit is contained in:
		| @@ -1,6 +1,7 @@ | ||||
| package asynqmon | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| @@ -11,7 +12,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| type getMetricsResponse struct { | ||||
| 	// TODO | ||||
| 	QueueSize *json.RawMessage `json:"queue_size"` | ||||
| } | ||||
|  | ||||
| type metricsFetchOptions struct { | ||||
| @@ -32,16 +33,31 @@ func newGetMetricsHandlerFunc(client *http.Client, prometheusAddr string) http.H | ||||
| 			http.Error(w, fmt.Sprintf("invalid query parameter: %v", err), http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
| 		url := buildPrometheusURL(prometheusAddr, "asynq_queue_size", opts) | ||||
| 		fmt.Printf("DEBUG: url: %s\n", url) | ||||
| 		resp, err := client.Get(url) | ||||
| 		// List of quries to send to prometheus server. | ||||
| 		queries := []string{ | ||||
| 			"asynq_queue_size", | ||||
| 		} | ||||
| 		resp := getMetricsResponse{} | ||||
| 		for _, q := range queries { | ||||
| 			url := buildPrometheusURL(prometheusAddr, q, opts) | ||||
| 			fmt.Printf("DEBUG: url: %s\n", url) | ||||
| 			msg, err := fetchPrometheusMetrics(client, url) | ||||
| 			if err != nil { | ||||
| 				http.Error(w, fmt.Sprintf("failed to fetch %q: %v", url, err), http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
| 			switch q { | ||||
| 			case "asynq_queue_size": | ||||
| 				resp.QueueSize = msg | ||||
| 			} | ||||
| 		} | ||||
| 		bytes, err := json.Marshal(resp) | ||||
| 		if err != nil { | ||||
| 			http.Error(w, fmt.Sprintf("request failed: GET %q: %v", url, err), http.StatusInternalServerError) | ||||
| 			http.Error(w, fmt.Sprintf("failed to marshal response into JSON: %v", err), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 		defer resp.Body.Close() | ||||
| 		if _, err := io.Copy(w, resp.Body); err != nil { | ||||
| 			http.Error(w, fmt.Sprintf("failed to copy: %v", err), http.StatusInternalServerError) | ||||
| 		if _, err := w.Write(bytes); err != nil { | ||||
| 			http.Error(w, fmt.Sprintf("failed to write to response: %v", err), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| @@ -86,6 +102,20 @@ func buildPrometheusURL(baseAddr, promQL string, opts *metricsFetchOptions) stri | ||||
| 	return b.String() | ||||
| } | ||||
|  | ||||
| func fetchPrometheusMetrics(client *http.Client, url string) (*json.RawMessage, error) { | ||||
| 	resp, err := client.Get(url) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	defer resp.Body.Close() | ||||
| 	bytes, err := io.ReadAll(resp.Body) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	msg := json.RawMessage(bytes) | ||||
| 	return &msg, err | ||||
| } | ||||
|  | ||||
| // Returns step to use given the fetch options. | ||||
| // In general, the longer the duration, longer the each step. | ||||
| func step(opts *metricsFetchOptions) time.Duration { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user