package main import ( "encoding/json" "net/http" "github.com/gorilla/mux" "github.com/hibiken/asynq" ) // **************************************************************************** // This file defines: // - http.Handler(s) for queue related endpoints // **************************************************************************** func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } var snapshots []*QueueStateSnapshot for _, qname := range qnames { s, err := inspector.CurrentStats(qname) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } snapshots = append(snapshots, toQueueStateSnapshot(s)) } payload := map[string]interface{}{"queues": snapshots} json.NewEncoder(w).Encode(payload) } } func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] payload := make(map[string]interface{}) stats, err := inspector.CurrentStats(qname) if err != nil { // TODO: Check for queue not found error. http.Error(w, err.Error(), http.StatusInternalServerError) return } payload["current"] = toQueueStateSnapshot(stats) // TODO: make this n a variable data, err := inspector.History(qname, 10) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } var dailyStats []*DailyStats for _, s := range data { dailyStats = append(dailyStats, toDailyStats(s)) } payload["history"] = dailyStats json.NewEncoder(w).Encode(payload) } } func newDeleteQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] if err := inspector.DeleteQueue(qname, false); err != nil { if _, ok := err.(*asynq.ErrQueueNotFound); ok { http.Error(w, err.Error(), http.StatusNotFound) return } if _, ok := err.(*asynq.ErrQueueNotEmpty); ok { http.Error(w, err.Error(), http.StatusBadRequest) return } http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newPauseQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] if err := inspector.PauseQueue(qname); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } func newResumeQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] if err := inspector.UnpauseQueue(qname); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } } type ListQueueStatsResponse struct { Stats map[string][]*DailyStats `json:"stats"` } func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } resp := ListQueueStatsResponse{Stats: make(map[string][]*DailyStats)} const numdays = 90 // Get stats for the last 90 days. for _, qname := range qnames { stats, err := inspector.History(qname, numdays) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } resp.Stats[qname] = toDailyStatsList(stats) } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } }