Add ListSchedulerEntries API endpoint

This commit is contained in:
Ken Hibino 2020-12-02 07:19:06 -08:00
parent 3dd6fdc0b0
commit fbbc414bdf
5 changed files with 88 additions and 5 deletions

View File

@ -6,6 +6,12 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
// ****************************************************************************
// This file defines:
// - internal types with JSON struct tags
// - conversion function from an external type to an internal type
// ****************************************************************************
type QueueStateSnapshot struct { type QueueStateSnapshot struct {
// Name of the queue. // Name of the queue.
Queue string `json:"queue"` Queue string `json:"queue"`
@ -206,3 +212,37 @@ func toDeadTasks(in []*asynq.DeadTask) []*DeadTask {
} }
return out return out
} }
type SchedulerEntry struct {
ID string `json:"id"`
Spec string `json:"spec"`
TaskType string `json:"task_type"`
TaskPayload asynq.Payload `json:"task_payload"`
Opts []string `json:"options"`
NextEnqueueAt time.Time `json:"next_enqueue_at"`
PrevEnqueueAt time.Time `json:"prev_enqueue_at"`
}
func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts {
opts = append(opts, o.String())
}
return &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
TaskType: e.Task.Type,
TaskPayload: e.Task.Payload,
Opts: opts,
NextEnqueueAt: e.Next,
PrevEnqueueAt: e.Prev,
}
}
func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
out := make([]*SchedulerEntry, len(in))
for i, e := range in {
out[i] = toSchedulerEntry(e)
}
return out
}

View File

@ -87,6 +87,8 @@ func main() {
newListRetryTasksHandlerFunc(inspector)).Methods("GET") newListRetryTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}/dead_tasks", api.HandleFunc("/queues/{qname}/dead_tasks",
newListDeadTasksHandlerFunc(inspector)).Methods("GET") newListDeadTasksHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/scheduler_entries",
newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET")
fs := &staticFileServer{staticPath: "ui/build", indexPath: "index.html"} fs := &staticFileServer{staticPath: "ui/build", indexPath: "index.html"}
router.PathPrefix("/").Handler(fs) router.PathPrefix("/").Handler(fs)

View File

@ -8,6 +8,11 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
// ****************************************************************************
// This file defines:
// - http.Handler(s) for queue related endpoints
// ****************************************************************************
func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qnames, err := inspector.Queues() qnames, err := inspector.Queues()

View File

@ -0,0 +1,31 @@
package main
import (
"encoding/json"
"net/http"
"github.com/hibiken/asynq"
)
// ****************************************************************************
// This file defines:
// - http.Handler(s) for scheduler entry related endpoints
// ****************************************************************************
func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
entries, err := inspector.SchedulerEntries()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
payload := make(map[string]interface{})
if len(entries) == 0 {
// avoid nil for the entries field in json output.
payload["entries"] = make([]*SchedulerEntry, 0)
} else {
payload["entries"] = toSchedulerEntries(entries)
}
json.NewEncoder(w).Encode(payload)
}
}

View File

@ -9,6 +9,11 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
// ****************************************************************************
// This file defines:
// - http.Handler(s) for task related endpoints
// ****************************************************************************
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
@ -28,7 +33,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*asynq.ActiveTask, 0) payload["tasks"] = make([]*ActiveTask, 0)
} else { } else {
payload["tasks"] = toActiveTasks(tasks) payload["tasks"] = toActiveTasks(tasks)
} }
@ -56,7 +61,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*asynq.PendingTask, 0) payload["tasks"] = make([]*PendingTask, 0)
} else { } else {
payload["tasks"] = toPendingTasks(tasks) payload["tasks"] = toPendingTasks(tasks)
} }
@ -84,7 +89,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*asynq.ScheduledTask, 0) payload["tasks"] = make([]*ScheduledTask, 0)
} else { } else {
payload["tasks"] = toScheduledTasks(tasks) payload["tasks"] = toScheduledTasks(tasks)
} }
@ -112,7 +117,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*asynq.RetryTask, 0) payload["tasks"] = make([]*RetryTask, 0)
} else { } else {
payload["tasks"] = toRetryTasks(tasks) payload["tasks"] = toRetryTasks(tasks)
} }
@ -140,7 +145,7 @@ func newListDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*asynq.DeadTask, 0) payload["tasks"] = make([]*DeadTask, 0)
} else { } else {
payload["tasks"] = toDeadTasks(tasks) payload["tasks"] = toDeadTasks(tasks)
} }