From d78abcf58453a045c30c88fb3417f7befc243a34 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 30 Dec 2020 08:58:50 -0800 Subject: [PATCH] Add API endpoint for listing running servers --- conversion_helpers.go | 54 +++++++++++++++++++++++++++++++++++++++++++ main.go | 3 +++ server_handlers.go | 34 +++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 server_handlers.go diff --git a/conversion_helpers.go b/conversion_helpers.go index 313d3be..96c82aa 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -285,3 +285,57 @@ func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnq } return out } + +type ServerInfo struct { + ID string `json:"id"` + Host string `json:"host"` + PID int `json:"pid"` + Concurrency int `json:"concurrency"` + Queues map[string]int `json:"queue_priorities"` + StrictPriority bool `json:"strict_priority_enabled"` + Started string `json:"start_time"` + Status string `json:"status"` + ActiveWorkers []*WorkerInfo `json:"active_workers"` +} + +func toServerInfo(info *asynq.ServerInfo) *ServerInfo { + return &ServerInfo{ + ID: info.ID, + Host: info.Host, + PID: info.PID, + Concurrency: info.Concurrency, + Queues: info.Queues, + StrictPriority: info.StrictPriority, + Started: info.Started.Format(time.RFC3339), + Status: info.Status, + ActiveWorkers: toWorkerInfoList(info.ActiveWorkers), + } +} + +func toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo { + out := make([]*ServerInfo, len(in)) + for i, s := range in { + out[i] = toServerInfo(s) + } + return out +} + +type WorkerInfo struct { + Task *ActiveTask `json:"task"` + Started string `json:"start_time"` +} + +func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo { + return &WorkerInfo{ + Task: toActiveTask(info.Task), + Started: info.Started.Format(time.RFC3339), + } +} + +func toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo { + out := make([]*WorkerInfo, len(in)) + for i, w := range in { + out[i] = toWorkerInfo(w) + } + return out +} diff --git a/main.go b/main.go index 09aaaeb..a192d1a 100644 --- a/main.go +++ b/main.go @@ -112,6 +112,9 @@ func main() { api.HandleFunc("/queues/{qname}/dead_tasks:run_all", newRunAllDeadTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/dead_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") + // Servers endpoints. + api.HandleFunc("/servers", newListServersHandlerFunc(inspector)).Methods("GET") + // Scheduler Entry endpoints. api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") diff --git a/server_handlers.go b/server_handlers.go new file mode 100644 index 0000000..921202c --- /dev/null +++ b/server_handlers.go @@ -0,0 +1,34 @@ +package main + +import ( + "encoding/json" + "net/http" + + "github.com/hibiken/asynq" +) + +// **************************************************************************** +// This file defines: +// - http.Handler(s) for server related endpoints +// **************************************************************************** + +type ListServersResponse struct { + Servers []*ServerInfo `json:"servers"` +} + +func newListServersHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + srvs, err := inspector.Servers() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := ListServersResponse{ + Servers: toServerInfoList(srvs), + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +}