mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-19 03:05:53 +08:00
238 lines
12 KiB
Go
238 lines
12 KiB
Go
package asynqmon
|
|
|
|
import (
|
|
"embed"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/hibiken/asynq"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// Options are used to configure HTTPHandler.
|
|
type Options struct {
|
|
// URL path the handler is responsible for.
|
|
// The path is used for the homepage of asynqmon, and every other page is rooted in this subtree.
|
|
//
|
|
// This field is optional. Default is "/".
|
|
RootPath string
|
|
|
|
// RedisConnOpt specifies the connection to a redis-server or redis-cluster.
|
|
//
|
|
// This field is required.
|
|
RedisConnOpt asynq.RedisConnOpt
|
|
|
|
// PayloadFormatter is used to convert payload bytes to string shown in the UI.
|
|
//
|
|
// This field is optional.
|
|
PayloadFormatter PayloadFormatter
|
|
|
|
// ResultFormatter is used to convert result bytes to string shown in the UI.
|
|
//
|
|
// This field is optional.
|
|
ResultFormatter ResultFormatter
|
|
|
|
// PrometheusAddress specifies the address of the Prometheus to connect to.
|
|
//
|
|
// This field is optional. If this field is set, asynqmon will query the Prometheus server
|
|
// to get the time series data about queue metrics and show them in the web UI.
|
|
PrometheusAddress string
|
|
|
|
// Set ReadOnly to true to restrict user to view-only mode.
|
|
ReadOnly bool
|
|
}
|
|
|
|
// HTTPHandler is a http.Handler for asynqmon application.
|
|
type HTTPHandler struct {
|
|
router *mux.Router
|
|
closers []func() error
|
|
rootPath string // the value should not have the trailing slash
|
|
}
|
|
|
|
func (h *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
h.router.ServeHTTP(w, r)
|
|
}
|
|
|
|
// New creates a HTTPHandler with the given options.
|
|
func New(opts Options) *HTTPHandler {
|
|
if opts.RedisConnOpt == nil {
|
|
panic("asynqmon.New: RedisConnOpt field is required")
|
|
}
|
|
rc, ok := opts.RedisConnOpt.MakeRedisClient().(redis.UniversalClient)
|
|
if !ok {
|
|
panic(fmt.Sprintf("asnyqmon.New: unsupported RedisConnOpt type %T", opts.RedisConnOpt))
|
|
}
|
|
i := asynq.NewInspector(opts.RedisConnOpt)
|
|
|
|
// Make sure that RootPath starts with a slash if provided.
|
|
if opts.RootPath != "" && !strings.HasPrefix(opts.RootPath, "/") {
|
|
panic(fmt.Sprintf("asynqmon.New: RootPath must start with a slash"))
|
|
}
|
|
// Remove tailing slash from RootPath.
|
|
opts.RootPath = strings.TrimSuffix(opts.RootPath, "/")
|
|
|
|
return &HTTPHandler{
|
|
router: muxRouter(opts, rc, i),
|
|
closers: []func() error{rc.Close, i.Close},
|
|
rootPath: opts.RootPath,
|
|
}
|
|
}
|
|
|
|
// Close closes connections to redis.
|
|
func (h *HTTPHandler) Close() error {
|
|
for _, f := range h.closers {
|
|
if err := f(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RootPath returns the root URL path used for asynqmon application.
|
|
// Returned path string does not have the trailing slash.
|
|
func (h *HTTPHandler) RootPath() string {
|
|
return h.rootPath
|
|
}
|
|
|
|
//go:embed ui/build/*
|
|
var staticContents embed.FS
|
|
|
|
func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspector) *mux.Router {
|
|
router := mux.NewRouter().PathPrefix(opts.RootPath).Subrouter()
|
|
|
|
var payloadFmt PayloadFormatter = DefaultPayloadFormatter
|
|
if opts.PayloadFormatter != nil {
|
|
payloadFmt = opts.PayloadFormatter
|
|
}
|
|
|
|
var resultFmt ResultFormatter = DefaultResultFormatter
|
|
if opts.ResultFormatter != nil {
|
|
resultFmt = opts.ResultFormatter
|
|
}
|
|
|
|
api := router.PathPrefix("/api").Subrouter()
|
|
|
|
// Queue endpoints.
|
|
api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST")
|
|
|
|
// Queue Historical Stats endpoint.
|
|
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET")
|
|
|
|
// Task endpoints.
|
|
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:run_all", newRunAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:run_all", newRunAllRetryTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/completed_tasks", newListCompletedTasksHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/completed_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/completed_tasks:delete_all", newDeleteAllCompletedTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks", newListAggregatingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:delete_all", newDeleteAllAggregatingTasksHandlerFunc(inspector)).Methods("DELETE")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:run_all", newRunAllAggregatingTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:archive_all", newArchiveAllAggregatingTasksHandlerFunc(inspector)).Methods("POST")
|
|
api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
|
|
|
api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET")
|
|
|
|
// Groups endponts
|
|
api.HandleFunc("/queues/{qname}/groups", newListGroupsHandlerFunc(inspector)).Methods("GET")
|
|
|
|
// Servers endpoints.
|
|
api.HandleFunc("/servers", newListServersHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
|
|
// Scheduler Entry endpoints.
|
|
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, payloadFmt)).Methods("GET")
|
|
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")
|
|
|
|
// Redis info endpoint.
|
|
switch c := rc.(type) {
|
|
case *redis.ClusterClient:
|
|
api.HandleFunc("/redis_info", newRedisClusterInfoHandlerFunc(c, inspector)).Methods("GET")
|
|
case *redis.Client:
|
|
api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(c)).Methods("GET")
|
|
}
|
|
|
|
// Time series metrics endpoints.
|
|
api.HandleFunc("/metrics", newGetMetricsHandlerFunc(http.DefaultClient, opts.PrometheusAddress)).Methods("GET")
|
|
|
|
// Restrict APIs when running in read-only mode.
|
|
if opts.ReadOnly {
|
|
api.Use(restrictToReadOnly)
|
|
}
|
|
|
|
// Everything else, route to uiAssetsHandler.
|
|
router.NotFoundHandler = &uiAssetsHandler{
|
|
rootPath: opts.RootPath,
|
|
contents: staticContents,
|
|
staticDirPath: "ui/build",
|
|
indexFileName: "index.html",
|
|
prometheusAddr: opts.PrometheusAddress,
|
|
readOnly: opts.ReadOnly,
|
|
}
|
|
|
|
return router
|
|
}
|
|
|
|
// restrictToReadOnly is a middleware function to restrict users to perform only GET requests.
|
|
func restrictToReadOnly(h http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "GET" && r.Method != "" {
|
|
http.Error(w, fmt.Sprintf("API Server is running in read-only mode: %s request is not allowed", r.Method), http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
h.ServeHTTP(w, r)
|
|
})
|
|
}
|