From 8d531c04cdba9d8dfe0f856d1ee80620e95e9ebb Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 26 Dec 2020 10:05:19 -0800 Subject: [PATCH] Add API endpoints to list scheduler enqueue events --- conversion_helpers.go | 20 ++++++++++++++++++++ main.go | 1 + scheduler_entry_handlers.go | 30 +++++++++++++++++++++++++++++- 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/conversion_helpers.go b/conversion_helpers.go index 21ef339..54025c8 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -257,3 +257,23 @@ func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { } return out } + +type SchedulerEnqueueEvent struct { + TaskID string `json:"task_id"` + EnqueuedAt string `json:"enqueued_at"` +} + +func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { + return &SchedulerEnqueueEvent{ + TaskID: e.TaskID, + EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339), + } +} + +func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { + out := make([]*SchedulerEnqueueEvent, len(in)) + for i, e := range in { + out[i] = toSchedulerEnqueueEvent(e) + } + return out +} diff --git a/main.go b/main.go index 24f9ba7..d03f9e1 100644 --- a/main.go +++ b/main.go @@ -111,6 +111,7 @@ func main() { // Scheduler Entry endpoints. api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") fs := &staticFileServer{staticPath: "ui/build", indexPath: "index.html"} router.PathPrefix("/").Handler(fs) diff --git a/scheduler_entry_handlers.go b/scheduler_entry_handlers.go index fb38554..e4d888b 100644 --- a/scheduler_entry_handlers.go +++ b/scheduler_entry_handlers.go @@ -4,6 +4,7 @@ import ( "encoding/json" "net/http" + "github.com/gorilla/mux" "github.com/hibiken/asynq" ) @@ -26,6 +27,33 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.Handler } else { payload["entries"] = toSchedulerEntries(entries) } - json.NewEncoder(w).Encode(payload) + if err := json.NewEncoder(w).Encode(payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + +type ListSchedulerEnqueueEventsResponse struct { + Events []*SchedulerEnqueueEvent `json:"events"` +} + +func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + entryID := mux.Vars(r)["entry_id"] + pageSize, pageNum := getPageOptions(r) + events, err := inspector.ListSchedulerEnqueueEvents( + entryID, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + resp := ListSchedulerEnqueueEventsResponse{ + Events: toSchedulerEnqueueEvents(events), + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } } }