asynqmon/main.go

163 lines
7.1 KiB
Go
Raw Normal View History

2020-11-24 22:54:00 +08:00
package main
import (
"embed"
2020-11-24 22:54:00 +08:00
"fmt"
"io"
2020-11-24 22:54:00 +08:00
"log"
"net/http"
"os"
"path/filepath"
"time"
2021-01-03 22:56:53 +08:00
"github.com/go-redis/redis/v8"
2020-11-24 22:54:00 +08:00
"github.com/gorilla/mux"
"github.com/hibiken/asynq"
"github.com/rs/cors"
)
// staticFileServer implements the http.Handler interface, so we can use it
// to respond to HTTP requests. The path to the static directory and
// path to the index file within that static directory are used to
// serve the SPA in the given static directory.
type staticFileServer struct {
staticContents embed.FS
staticDirPath string
indexFileName string
2020-11-24 22:54:00 +08:00
}
// ServeHTTP inspects the URL path to locate a file within the static dir
// on the SPA handler.
// If path '/' is requested, it will serve the index file, otherwise it will
// serve the file specified by the URL path.
2020-11-24 22:54:00 +08:00
func (srv *staticFileServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Get the absolute path to prevent directory traversal.
2020-11-24 22:54:00 +08:00
path, err := filepath.Abs(r.URL.Path)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if path == "/" {
path = filepath.Join(srv.staticDirPath, srv.indexFileName)
} else {
path = filepath.Join(srv.staticDirPath, path)
}
2020-11-24 22:54:00 +08:00
f, err := srv.staticContents.Open(path)
if err != nil {
status := http.StatusInternalServerError
if os.IsNotExist(err) {
status = http.StatusNotFound
}
http.Error(w, err.Error(), status)
2020-11-24 22:54:00 +08:00
return
}
defer f.Close()
if _, err := io.Copy(w, f); err != nil {
2020-11-24 22:54:00 +08:00
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
const (
addr = "127.0.0.1:8080"
redisAddr = "localhost:6379" // TODO: make this configurable
)
2020-11-24 22:54:00 +08:00
//go:embed ui/build/*
var staticContents embed.FS
2020-11-24 22:54:00 +08:00
func main() {
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: redisAddr,
2020-11-24 22:54:00 +08:00
})
defer inspector.Close()
2021-01-03 22:56:53 +08:00
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
2021-01-03 22:56:53 +08:00
})
defer rdb.Close()
2020-11-24 22:54:00 +08:00
router := mux.NewRouter()
2020-12-07 23:14:30 +08:00
router.Use(loggingMiddleware)
2020-11-24 22:54:00 +08:00
api := router.PathPrefix("/api").Subrouter()
// Queue endpoints.
api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST")
// Queue Historical Stats endpoint.
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET")
// Task endpoints.
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
2020-12-23 22:59:44 +08:00
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:run_all", newRunAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
2020-12-15 22:16:58 +08:00
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}:kill", newKillTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:kill_all", newKillAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_kill", newBatchKillTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:run_all", newRunAllRetryTasksHandlerFunc(inspector)).Methods("POST")
2020-12-15 22:16:58 +08:00
api.HandleFunc("/queues/{qname}/retry_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_key}:kill", newKillTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:kill_all", newKillAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_kill", newBatchKillTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:delete_all", newDeleteAllDeadTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/dead_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/dead_tasks/{task_key}:run", newRunTaskHandlerFunc(inspector)).Methods("POST")
2020-12-16 23:35:36 +08:00
api.HandleFunc("/queues/{qname}/dead_tasks:run_all", newRunAllDeadTasksHandlerFunc(inspector)).Methods("POST")
2020-12-15 22:16:58 +08:00
api.HandleFunc("/queues/{qname}/dead_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
// Servers endpoints.
api.HandleFunc("/servers", newListServersHandlerFunc(inspector)).Methods("GET")
// Scheduler Entry endpoints.
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")
2020-11-24 22:54:00 +08:00
2021-01-03 22:56:53 +08:00
// Redis info endpoint.
api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(rdb)).Methods("GET")
fs := &staticFileServer{
staticContents: staticContents,
staticDirPath: "ui/build",
indexFileName: "index.html",
}
2020-11-24 22:54:00 +08:00
router.PathPrefix("/").Handler(fs)
c := cors.New(cors.Options{
AllowedMethods: []string{"GET", "POST", "DELETE"},
})
handler := c.Handler(router)
2020-11-24 22:54:00 +08:00
srv := &http.Server{
Handler: handler,
Addr: addr,
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
}
fmt.Printf("Asynq Monitoring WebUI server is running on %s\n", addr)
log.Fatal(srv.ListenAndServe())
}