package asynqmon import ( "fmt" "net/http" "github.com/go-redis/redis/v8" "github.com/gorilla/mux" "github.com/hibiken/asynq" ) // Options can be used to customise HTTPHandler. type Options struct { RedisConnOpt asynq.RedisConnOpt PayloadFormatter PayloadFormatter } // HTTPHandler can serve the API and UI required for asynq monitoring. type HTTPHandler struct { router *mux.Router closers []func() error } // ServeHTTP will serve the API request as well as any static resources. func (a *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.router.ServeHTTP(w, r) } // New creates an HTTPHandler that can be used to serve asynqmon web API, it is prefixed with `/api`. func New(opts Options) *HTTPHandler { rc, ok := opts.RedisConnOpt.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asnyqmon.HTTPHandler: unsupported RedisConnOpt type %T", opts.RedisConnOpt)) } i := asynq.NewInspector(opts.RedisConnOpt) return &HTTPHandler{router: muxRouter(opts, rc, i), closers: []func() error{rc.Close, i.Close}} } // Close will close connections to redis. func (a *HTTPHandler) Close() error { for _, f := range a.closers { if err := f(); err != nil { return err } } return nil } func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspector) *mux.Router { router := mux.NewRouter() var pf PayloadFormatter = defaultPayloadFormatter if opts.PayloadFormatter != nil { pf = opts.PayloadFormatter } api := router.PathPrefix("/api").Subrouter() // Queue endpoints. api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST") // Queue Historical Stats endpoint. api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET") // Task endpoints. api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:run_all", newRunAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:run_all", newRunAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}:archive", newArchiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}:run", newRunTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, pf)).Methods("GET") // Servers endpoints. api.HandleFunc("/servers", newListServersHandlerFunc(inspector, pf)).Methods("GET") // Scheduler Entry endpoints. api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") // Redis info endpoint. switch c := rc.(type) { case *redis.ClusterClient: api.HandleFunc("/redis_info", newRedisClusterInfoHandlerFunc(c, inspector)).Methods("GET") case *redis.Client: api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(c)).Methods("GET") } return router }