From fbbc414bdfbc572d147835cc9dd57fa8dbcb5766 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 2 Dec 2020 07:19:06 -0800 Subject: [PATCH] Add ListSchedulerEntries API endpoint --- conversion_helpers.go | 40 +++++++++++++++++++++++++++++++++++++ main.go | 2 ++ queue_handlers.go | 5 +++++ scheduler_entry_handlers.go | 31 ++++++++++++++++++++++++++++ task_handlers.go | 15 +++++++++----- 5 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 scheduler_entry_handlers.go diff --git a/conversion_helpers.go b/conversion_helpers.go index c36d7b2..2f49e93 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -6,6 +6,12 @@ import ( "github.com/hibiken/asynq" ) +// **************************************************************************** +// This file defines: +// - internal types with JSON struct tags +// - conversion function from an external type to an internal type +// **************************************************************************** + type QueueStateSnapshot struct { // Name of the queue. Queue string `json:"queue"` @@ -206,3 +212,37 @@ func toDeadTasks(in []*asynq.DeadTask) []*DeadTask { } return out } + +type SchedulerEntry struct { + ID string `json:"id"` + Spec string `json:"spec"` + TaskType string `json:"task_type"` + TaskPayload asynq.Payload `json:"task_payload"` + Opts []string `json:"options"` + NextEnqueueAt time.Time `json:"next_enqueue_at"` + PrevEnqueueAt time.Time `json:"prev_enqueue_at"` +} + +func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { + opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output + for _, o := range e.Opts { + opts = append(opts, o.String()) + } + return &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + TaskType: e.Task.Type, + TaskPayload: e.Task.Payload, + Opts: opts, + NextEnqueueAt: e.Next, + PrevEnqueueAt: e.Prev, + } +} + +func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { + out := make([]*SchedulerEntry, len(in)) + for i, e := range in { + out[i] = toSchedulerEntry(e) + } + return out +} diff --git a/main.go b/main.go index 546bfa9..5c5fea3 100644 --- a/main.go +++ b/main.go @@ -87,6 +87,8 @@ func main() { newListRetryTasksHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}/dead_tasks", newListDeadTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/scheduler_entries", + newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") fs := &staticFileServer{staticPath: "ui/build", indexPath: "index.html"} router.PathPrefix("/").Handler(fs) diff --git a/queue_handlers.go b/queue_handlers.go index 3a726ca..e774a00 100644 --- a/queue_handlers.go +++ b/queue_handlers.go @@ -8,6 +8,11 @@ import ( "github.com/hibiken/asynq" ) +// **************************************************************************** +// This file defines: +// - http.Handler(s) for queue related endpoints +// **************************************************************************** + func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() diff --git a/scheduler_entry_handlers.go b/scheduler_entry_handlers.go new file mode 100644 index 0000000..fb38554 --- /dev/null +++ b/scheduler_entry_handlers.go @@ -0,0 +1,31 @@ +package main + +import ( + "encoding/json" + "net/http" + + "github.com/hibiken/asynq" +) + +// **************************************************************************** +// This file defines: +// - http.Handler(s) for scheduler entry related endpoints +// **************************************************************************** + +func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + entries, err := inspector.SchedulerEntries() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + payload := make(map[string]interface{}) + if len(entries) == 0 { + // avoid nil for the entries field in json output. + payload["entries"] = make([]*SchedulerEntry, 0) + } else { + payload["entries"] = toSchedulerEntries(entries) + } + json.NewEncoder(w).Encode(payload) + } +} diff --git a/task_handlers.go b/task_handlers.go index b1d6d09..59e6de3 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -9,6 +9,11 @@ import ( "github.com/hibiken/asynq" ) +// **************************************************************************** +// This file defines: +// - http.Handler(s) for task related endpoints +// **************************************************************************** + func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -28,7 +33,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*asynq.ActiveTask, 0) + payload["tasks"] = make([]*ActiveTask, 0) } else { payload["tasks"] = toActiveTasks(tasks) } @@ -56,7 +61,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*asynq.PendingTask, 0) + payload["tasks"] = make([]*PendingTask, 0) } else { payload["tasks"] = toPendingTasks(tasks) } @@ -84,7 +89,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*asynq.ScheduledTask, 0) + payload["tasks"] = make([]*ScheduledTask, 0) } else { payload["tasks"] = toScheduledTasks(tasks) } @@ -112,7 +117,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*asynq.RetryTask, 0) + payload["tasks"] = make([]*RetryTask, 0) } else { payload["tasks"] = toRetryTasks(tasks) } @@ -140,7 +145,7 @@ func newListDeadTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { payload := make(map[string]interface{}) if len(tasks) == 0 { // avoid nil for the tasks field in json output. - payload["tasks"] = make([]*asynq.DeadTask, 0) + payload["tasks"] = make([]*DeadTask, 0) } else { payload["tasks"] = toDeadTasks(tasks) }