mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-08-24 06:38:42 +08:00
make server & router reusable outside asynqmon package
This commit is contained in:
138
cmd/asynqmon/main.go
Normal file
138
cmd/asynqmon/main.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ajatprabha/asynqmon"
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/rs/cors"
|
||||
)
|
||||
|
||||
// Command-line flags
|
||||
var (
|
||||
flagPort int
|
||||
flagRedisAddr string
|
||||
flagRedisDB int
|
||||
flagRedisPassword string
|
||||
flagRedisTLS string
|
||||
flagRedisURL string
|
||||
flagRedisInsecureTLS bool
|
||||
flagRedisClusterNodes string
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.IntVar(&flagPort, "port", 8080, "port number to use for web ui server")
|
||||
flag.StringVar(&flagRedisAddr, "redis-addr", "127.0.0.1:6379", "address of redis server to connect to")
|
||||
flag.IntVar(&flagRedisDB, "redis-db", 0, "redis database number")
|
||||
flag.StringVar(&flagRedisPassword, "redis-password", "", "password to use when connecting to redis server")
|
||||
flag.StringVar(&flagRedisTLS, "redis-tls", "", "server name for TLS validation used when connecting to redis server")
|
||||
flag.StringVar(&flagRedisURL, "redis-url", "", "URL to redis server")
|
||||
flag.BoolVar(&flagRedisInsecureTLS, "redis-insecure-tls", false, "disable TLS certificate host checks")
|
||||
flag.StringVar(&flagRedisClusterNodes, "redis-cluster-nodes", "", "comma separated list of host:port addresses of cluster nodes")
|
||||
}
|
||||
|
||||
func getRedisOptionsFromFlags() (*redis.UniversalOptions, error) {
|
||||
var opts redis.UniversalOptions
|
||||
|
||||
if flagRedisClusterNodes != "" {
|
||||
opts.Addrs = strings.Split(flagRedisClusterNodes, ",")
|
||||
opts.Password = flagRedisPassword
|
||||
} else {
|
||||
if flagRedisURL != "" {
|
||||
res, err := redis.ParseURL(flagRedisURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts.Addrs = append(opts.Addrs, res.Addr)
|
||||
opts.DB = res.DB
|
||||
opts.Password = res.Password
|
||||
|
||||
} else {
|
||||
opts.Addrs = []string{flagRedisAddr}
|
||||
opts.DB = flagRedisDB
|
||||
opts.Password = flagRedisPassword
|
||||
}
|
||||
}
|
||||
|
||||
if flagRedisTLS != "" {
|
||||
opts.TLSConfig = &tls.Config{ServerName: flagRedisTLS}
|
||||
}
|
||||
if flagRedisInsecureTLS {
|
||||
if opts.TLSConfig == nil {
|
||||
opts.TLSConfig = &tls.Config{}
|
||||
}
|
||||
opts.TLSConfig.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
return &opts, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
opts, err := getRedisOptionsFromFlags()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
useRedisCluster := flagRedisClusterNodes != ""
|
||||
|
||||
var redisConnOpt asynq.RedisConnOpt
|
||||
if useRedisCluster {
|
||||
redisConnOpt = asynq.RedisClusterClientOpt{
|
||||
Addrs: opts.Addrs,
|
||||
Password: opts.Password,
|
||||
TLSConfig: opts.TLSConfig,
|
||||
}
|
||||
} else {
|
||||
redisConnOpt = asynq.RedisClientOpt{
|
||||
Addr: opts.Addrs[0],
|
||||
DB: opts.DB,
|
||||
Password: opts.Password,
|
||||
TLSConfig: opts.TLSConfig,
|
||||
}
|
||||
}
|
||||
|
||||
inspector := asynq.NewInspector(redisConnOpt)
|
||||
defer inspector.Close()
|
||||
|
||||
var redisClient redis.UniversalClient
|
||||
if useRedisCluster {
|
||||
redisClient = redis.NewClusterClient(opts.Cluster())
|
||||
} else {
|
||||
redisClient = redis.NewClient(opts.Simple())
|
||||
}
|
||||
defer redisClient.Close()
|
||||
|
||||
router := asynqmon.NewRouter(asynqmon.RouterOptions{
|
||||
Inspector: inspector,
|
||||
Middlewares: []mux.MiddlewareFunc{loggingMiddleware},
|
||||
RedisClient: redisClient,
|
||||
})
|
||||
|
||||
router.PathPrefix("/").Handler(asynqmon.NewStaticContentHandler())
|
||||
|
||||
c := cors.New(cors.Options{
|
||||
AllowedMethods: []string{"GET", "POST", "DELETE"},
|
||||
})
|
||||
|
||||
handler := c.Handler(router)
|
||||
|
||||
srv := &http.Server{
|
||||
Handler: handler,
|
||||
Addr: fmt.Sprintf(":%d", flagPort),
|
||||
WriteTimeout: 10 * time.Second,
|
||||
ReadTimeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
fmt.Printf("Asynq Monitoring WebUI server is listening on port %d\n", flagPort)
|
||||
log.Fatal(srv.ListenAndServe())
|
||||
}
|
60
cmd/asynqmon/middlewares.go
Normal file
60
cmd/asynqmon/middlewares.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
// A responseRecorderWriter records response status and size.
|
||||
// It implements http.ResponseWriter interface.
|
||||
type responseRecorderWriter struct {
|
||||
http.ResponseWriter
|
||||
// The status code that the server sends back to the client.
|
||||
status int
|
||||
// The size of the object returned to the client, not including the response headers.
|
||||
size int
|
||||
}
|
||||
|
||||
func (w *responseRecorderWriter) WriteHeader(status int) {
|
||||
w.ResponseWriter.WriteHeader(status)
|
||||
w.status = status
|
||||
}
|
||||
|
||||
func (w *responseRecorderWriter) Write(b []byte) (int, error) {
|
||||
// If WriteHeader is not called explicitly, the first call to Write
|
||||
// will trigger an implicit WriteHeader(http.StatusOK).
|
||||
if w.status == 0 {
|
||||
w.status = http.StatusOK
|
||||
}
|
||||
n, err := w.ResponseWriter.Write(b)
|
||||
w.size += n
|
||||
return n, err
|
||||
}
|
||||
|
||||
func loggingMiddleware(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
rw := &responseRecorderWriter{ResponseWriter: w}
|
||||
h.ServeHTTP(rw, r)
|
||||
|
||||
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||
if err != nil {
|
||||
host = r.RemoteAddr
|
||||
}
|
||||
username := "-"
|
||||
if user := r.URL.User; user != nil {
|
||||
username = user.Username()
|
||||
}
|
||||
size := "-"
|
||||
if rw.size > 0 {
|
||||
size = strconv.Itoa(rw.size)
|
||||
}
|
||||
// Write a log in Apache common log format (http://httpd.apache.org/docs/2.2/logs.html#common).
|
||||
fmt.Fprintf(os.Stdout, "%s - %s [%s] \"%s %s %s\" %d %s\n",
|
||||
host, username, time.Now().Format("02/Jan/2006:15:04:05 -0700"),
|
||||
r.Method, r.URL, r.Proto, rw.status, size)
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user