mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-19 03:05:53 +08:00
84 lines
2.2 KiB
Go
84 lines
2.2 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/hibiken/asynq"
|
|
)
|
|
|
|
func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
qnames, err := inspector.Queues()
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
var snapshots []*QueueStateSnapshot
|
|
for _, qname := range qnames {
|
|
s, err := inspector.CurrentStats(qname)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
snapshots = append(snapshots, toQueueStateSnapshot(s))
|
|
}
|
|
payload := map[string]interface{}{"queues": snapshots}
|
|
json.NewEncoder(w).Encode(payload)
|
|
}
|
|
}
|
|
|
|
func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
qname := vars["qname"]
|
|
|
|
payload := make(map[string]interface{})
|
|
stats, err := inspector.CurrentStats(qname)
|
|
if err != nil {
|
|
// TODO: Check for queue not found error.
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
payload["current"] = toQueueStateSnapshot(stats)
|
|
|
|
// TODO: make this n a variable
|
|
data, err := inspector.History(qname, 10)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
var dailyStats []*DailyStats
|
|
for _, s := range data {
|
|
dailyStats = append(dailyStats, toDailyStats(s))
|
|
}
|
|
payload["history"] = dailyStats
|
|
json.NewEncoder(w).Encode(payload)
|
|
}
|
|
}
|
|
|
|
func newPauseQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
qname := vars["qname"]
|
|
if err := inspector.PauseQueue(qname); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
}
|
|
|
|
func newResumeQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
vars := mux.Vars(r)
|
|
qname := vars["qname"]
|
|
if err := inspector.UnpauseQueue(qname); err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
}
|