From a76670956be5096d42f493cb5c3609e0f4a1651a Mon Sep 17 00:00:00 2001 From: ajatprabha Date: Sat, 18 Sep 2021 17:56:00 +0530 Subject: [PATCH] add injectable PayloadStringer --- conversion_helpers.go | 206 ++++++++++++++++++------------------ queue_handlers.go | 16 +-- router.go | 37 ++++--- scheduler_entry_handlers.go | 8 +- server_handlers.go | 4 +- task_handlers.go | 34 +++--- 6 files changed, 153 insertions(+), 152 deletions(-) diff --git a/conversion_helpers.go b/conversion_helpers.go index c872409..bea1880 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -1,12 +1,10 @@ package asynqmon import ( - "encoding/json" + "github.com/hibiken/asynq" "time" "unicode" "unicode/utf8" - - "github.com/hibiken/asynq" ) // **************************************************************************** @@ -15,6 +13,7 @@ import ( // - conversion function from an external type to an internal type // **************************************************************************** +// PayloadStringer can be used to convert payload bytes to string to show in web ui. type PayloadStringer interface { String([]byte) string } @@ -25,15 +24,32 @@ func (f PayloadStringerFunc) String(b []byte) string { return f(b) } -var payloadStringer PayloadStringer = PayloadStringerFunc(func(payload []byte) string { +var defaultPayloadStringer = PayloadStringerFunc(func(payload []byte) string { if !isPrintable(payload) { return "non-printable bytes" } return string(payload) }) -func SetPayloadStringer(stringer PayloadStringer) { - payloadStringer = stringer +// isPrintable reports whether the given data is comprised of all printable runes. +func isPrintable(data []byte) bool { + if !utf8.Valid(data) { + return false + } + isAllSpace := true + for _, r := range string(data) { + if !unicode.IsPrint(r) { + return false + } + if !unicode.IsSpace(r) { + isAllSpace = false + } + } + return !isAllSpace +} + +type transformer struct { + ps PayloadStringer } type QueueStateSnapshot struct { @@ -63,7 +79,7 @@ type QueueStateSnapshot struct { Timestamp time.Time `json:"timestamp"` } -func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { +func (t *transformer) toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { return &QueueStateSnapshot{ Queue: s.Queue, MemoryUsage: s.MemoryUsage, @@ -89,7 +105,7 @@ type DailyStats struct { Date string `json:"date"` } -func toDailyStats(s *asynq.DailyStats) *DailyStats { +func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats { return &DailyStats{ Queue: s.Queue, Processed: s.Processed, @@ -99,10 +115,10 @@ func toDailyStats(s *asynq.DailyStats) *DailyStats { } } -func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { +func (t *transformer) toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { out := make([]*DailyStats, len(in)) for i, s := range in { - out[i] = toDailyStats(s) + out[i] = t.toDailyStats(s) } return out } @@ -145,12 +161,12 @@ func formatTimeInRFC3339(t time.Time) string { return t.Format(time.RFC3339) } -func toTaskInfo(info *asynq.TaskInfo) *TaskInfo { +func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo { return &TaskInfo{ ID: info.ID, Queue: info.Queue, Type: info.Type, - Payload: payloadStringer.String(info.Payload), + Payload: t.ps.String(info.Payload), State: info.State.String(), MaxRetry: info.MaxRetry, Retried: info.Retried, @@ -189,23 +205,23 @@ type ActiveTask struct { Deadline string `json:"deadline"` } -func toActiveTask(t *asynq.TaskInfo) *ActiveTask { +func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask { base := &BaseTask{ - ID: t.ID, - Type: t.Type, - Payload: payloadStringer.String(t.Payload), - Queue: t.Queue, - MaxRetry: t.MaxRetry, - Retried: t.Retried, - LastError: t.LastErr, + ID: ti.ID, + Type: ti.Type, + Payload: t.ps.String(ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, } return &ActiveTask{BaseTask: base} } -func toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask { +func (t *transformer) toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask { out := make([]*ActiveTask, len(in)) - for i, t := range in { - out[i] = toActiveTask(t) + for i, ti := range in { + out[i] = t.toActiveTask(ti) } return out } @@ -215,25 +231,25 @@ type PendingTask struct { *BaseTask } -func toPendingTask(t *asynq.TaskInfo) *PendingTask { +func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask { base := &BaseTask{ - ID: t.ID, - Type: t.Type, - Payload: payloadStringer.String(t.Payload), - Queue: t.Queue, - MaxRetry: t.MaxRetry, - Retried: t.Retried, - LastError: t.LastErr, + ID: ti.ID, + Type: ti.Type, + Payload: t.ps.String(ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, } return &PendingTask{ BaseTask: base, } } -func toPendingTasks(in []*asynq.TaskInfo) []*PendingTask { +func (t *transformer) toPendingTasks(in []*asynq.TaskInfo) []*PendingTask { out := make([]*PendingTask, len(in)) - for i, t := range in { - out[i] = toPendingTask(t) + for i, ti := range in { + out[i] = t.toPendingTask(ti) } return out } @@ -243,46 +259,26 @@ type ScheduledTask struct { NextProcessAt time.Time `json:"next_process_at"` } -// isPrintable reports whether the given data is comprised of all printable runes. -func isPrintable(data []byte) bool { - if !utf8.Valid(data) { - return false - } - if json.Valid(data) { - return true - } - isAllSpace := true - for _, r := range string(data) { - if !unicode.IsPrint(r) { - return false - } - if !unicode.IsSpace(r) { - isAllSpace = false - } - } - return !isAllSpace -} - -func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask { +func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask { base := &BaseTask{ - ID: t.ID, - Type: t.Type, - Payload: payloadStringer.String(t.Payload), - Queue: t.Queue, - MaxRetry: t.MaxRetry, - Retried: t.Retried, - LastError: t.LastErr, + ID: ti.ID, + Type: ti.Type, + Payload: t.ps.String(ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, } return &ScheduledTask{ BaseTask: base, - NextProcessAt: t.NextProcessAt, + NextProcessAt: ti.NextProcessAt, } } -func toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask { +func (t *transformer) toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask { out := make([]*ScheduledTask, len(in)) - for i, t := range in { - out[i] = toScheduledTask(t) + for i, ti := range in { + out[i] = t.toScheduledTask(ti) } return out } @@ -292,26 +288,26 @@ type RetryTask struct { NextProcessAt time.Time `json:"next_process_at"` } -func toRetryTask(t *asynq.TaskInfo) *RetryTask { +func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask { base := &BaseTask{ - ID: t.ID, - Type: t.Type, - Payload: payloadStringer.String(t.Payload), - Queue: t.Queue, - MaxRetry: t.MaxRetry, - Retried: t.Retried, - LastError: t.LastErr, + ID: ti.ID, + Type: ti.Type, + Payload: t.ps.String(ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, } return &RetryTask{ BaseTask: base, - NextProcessAt: t.NextProcessAt, + NextProcessAt: ti.NextProcessAt, } } -func toRetryTasks(in []*asynq.TaskInfo) []*RetryTask { +func (t *transformer) toRetryTasks(in []*asynq.TaskInfo) []*RetryTask { out := make([]*RetryTask, len(in)) - for i, t := range in { - out[i] = toRetryTask(t) + for i, ti := range in { + out[i] = t.toRetryTask(ti) } return out } @@ -321,26 +317,26 @@ type ArchivedTask struct { LastFailedAt time.Time `json:"last_failed_at"` } -func toArchivedTask(t *asynq.TaskInfo) *ArchivedTask { +func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask { base := &BaseTask{ - ID: t.ID, - Type: t.Type, - Payload: payloadStringer.String(t.Payload), - Queue: t.Queue, - MaxRetry: t.MaxRetry, - Retried: t.Retried, - LastError: t.LastErr, + ID: ti.ID, + Type: ti.Type, + Payload: t.ps.String(ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, } return &ArchivedTask{ BaseTask: base, - LastFailedAt: t.LastFailedAt, + LastFailedAt: ti.LastFailedAt, } } -func toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask { +func (t *transformer) toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask { out := make([]*ArchivedTask, len(in)) - for i, t := range in { - out[i] = toArchivedTask(t) + for i, ti := range in { + out[i] = t.toArchivedTask(ti) } return out } @@ -356,7 +352,7 @@ type SchedulerEntry struct { PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"` } -func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { +func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output for _, o := range e.Opts { opts = append(opts, o.String()) @@ -369,17 +365,17 @@ func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { ID: e.ID, Spec: e.Spec, TaskType: e.Task.Type(), - TaskPayload: payloadStringer.String(e.Task.Payload()), + TaskPayload: t.ps.String(e.Task.Payload()), Opts: opts, NextEnqueueAt: e.Next.Format(time.RFC3339), PrevEnqueueAt: prev, } } -func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { +func (t *transformer) toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { out := make([]*SchedulerEntry, len(in)) for i, e := range in { - out[i] = toSchedulerEntry(e) + out[i] = t.toSchedulerEntry(e) } return out } @@ -389,17 +385,17 @@ type SchedulerEnqueueEvent struct { EnqueuedAt string `json:"enqueued_at"` } -func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { +func (t *transformer) toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { return &SchedulerEnqueueEvent{ TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339), } } -func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { +func (t *transformer) toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { out := make([]*SchedulerEnqueueEvent, len(in)) for i, e := range in { - out[i] = toSchedulerEnqueueEvent(e) + out[i] = t.toSchedulerEnqueueEvent(e) } return out } @@ -416,7 +412,7 @@ type ServerInfo struct { ActiveWorkers []*WorkerInfo `json:"active_workers"` } -func toServerInfo(info *asynq.ServerInfo) *ServerInfo { +func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo { return &ServerInfo{ ID: info.ID, Host: info.Host, @@ -426,14 +422,14 @@ func toServerInfo(info *asynq.ServerInfo) *ServerInfo { StrictPriority: info.StrictPriority, Started: info.Started.Format(time.RFC3339), Status: info.Status, - ActiveWorkers: toWorkerInfoList(info.ActiveWorkers), + ActiveWorkers: t.toWorkerInfoList(info.ActiveWorkers), } } -func toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo { +func (t *transformer) toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo { out := make([]*ServerInfo, len(in)) for i, s := range in { - out[i] = toServerInfo(s) + out[i] = t.toServerInfo(s) } return out } @@ -446,20 +442,20 @@ type WorkerInfo struct { Started string `json:"start_time"` } -func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo { +func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo { return &WorkerInfo{ TaskID: info.TaskID, Queue: info.Queue, TaskType: info.TaskType, - TakPayload: payloadStringer.String(info.TaskPayload), + TakPayload: t.ps.String(info.TaskPayload), Started: info.Started.Format(time.RFC3339), } } -func toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo { +func (t *transformer) toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo { out := make([]*WorkerInfo, len(in)) for i, w := range in { - out[i] = toWorkerInfo(w) + out[i] = t.toWorkerInfo(w) } return out } diff --git a/queue_handlers.go b/queue_handlers.go index d628647..42ca7c4 100644 --- a/queue_handlers.go +++ b/queue_handlers.go @@ -14,7 +14,7 @@ import ( // - http.Handler(s) for queue related endpoints // **************************************************************************** -func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { @@ -28,14 +28,14 @@ func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { http.Error(w, err.Error(), http.StatusInternalServerError) return } - snapshots[i] = toQueueStateSnapshot(qinfo) + snapshots[i] = t.toQueueStateSnapshot(qinfo) } payload := map[string]interface{}{"queues": snapshots} json.NewEncoder(w).Encode(payload) } } -func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -47,7 +47,7 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { http.Error(w, err.Error(), http.StatusInternalServerError) return } - payload["current"] = toQueueStateSnapshot(qinfo) + payload["current"] = t.toQueueStateSnapshot(qinfo) // TODO: make this n a variable data, err := inspector.History(qname, 10) @@ -57,14 +57,14 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { } var dailyStats []*DailyStats for _, s := range data { - dailyStats = append(dailyStats, toDailyStats(s)) + dailyStats = append(dailyStats, t.toDailyStats(s)) } payload["history"] = dailyStats json.NewEncoder(w).Encode(payload) } } -func newDeleteQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newDeleteQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -112,7 +112,7 @@ type ListQueueStatsResponse struct { Stats map[string][]*DailyStats `json:"stats"` } -func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { @@ -127,7 +127,7 @@ func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp.Stats[qname] = toDailyStatsList(stats) + resp.Stats[qname] = t.toDailyStatsList(stats) } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/router.go b/router.go index 9375135..52bb241 100644 --- a/router.go +++ b/router.go @@ -7,14 +7,19 @@ import ( ) type RouterOptions struct { - Inspector *asynq.Inspector - Middlewares []mux.MiddlewareFunc - RedisClient redis.UniversalClient + RedisClient redis.UniversalClient + Inspector *asynq.Inspector + Middlewares []mux.MiddlewareFunc + PayloadStringer PayloadStringer } func NewRouter(opts RouterOptions) *mux.Router { router := mux.NewRouter() inspector := opts.Inspector + t := &transformer{ps: defaultPayloadStringer} + if opts.PayloadStringer != nil { + t = &transformer{ps: opts.PayloadStringer} + } for _, mf := range opts.Middlewares { router.Use(mf) @@ -22,22 +27,22 @@ func NewRouter(opts RouterOptions) *mux.Router { api := router.PathPrefix("/api").Subrouter() // Queue endpoints. - api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET") - api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET") - api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE") + api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector, t)).Methods("DELETE") api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST") // Queue Historical Stats endpoint. - api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector, t)).Methods("GET") // Task endpoints. - api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -45,7 +50,7 @@ func NewRouter(opts RouterOptions) *mux.Router { api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -56,7 +61,7 @@ func NewRouter(opts RouterOptions) *mux.Router { api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -67,7 +72,7 @@ func NewRouter(opts RouterOptions) *mux.Router { api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -75,14 +80,14 @@ func NewRouter(opts RouterOptions) *mux.Router { api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, t)).Methods("GET") // Servers endpoints. - api.HandleFunc("/servers", newListServersHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/servers", newListServersHandlerFunc(inspector, t)).Methods("GET") // Scheduler Entry endpoints. - api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET") - api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector, t)).Methods("GET") // Redis info endpoint. switch c := opts.RedisClient.(type) { diff --git a/scheduler_entry_handlers.go b/scheduler_entry_handlers.go index 4b7cb05..fa62c69 100644 --- a/scheduler_entry_handlers.go +++ b/scheduler_entry_handlers.go @@ -13,7 +13,7 @@ import ( // - http.Handler(s) for scheduler entry related endpoints // **************************************************************************** -func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { entries, err := inspector.SchedulerEntries() if err != nil { @@ -25,7 +25,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.Handler // avoid nil for the entries field in json output. payload["entries"] = make([]*SchedulerEntry, 0) } else { - payload["entries"] = toSchedulerEntries(entries) + payload["entries"] = t.toSchedulerEntries(entries) } if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -38,7 +38,7 @@ type ListSchedulerEnqueueEventsResponse struct { Events []*SchedulerEnqueueEvent `json:"events"` } -func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { entryID := mux.Vars(r)["entry_id"] pageSize, pageNum := getPageOptions(r) @@ -49,7 +49,7 @@ func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.H return } resp := ListSchedulerEnqueueEventsResponse{ - Events: toSchedulerEnqueueEvents(events), + Events: t.toSchedulerEnqueueEvents(events), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server_handlers.go b/server_handlers.go index 6ebbbb5..72a11aa 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -16,7 +16,7 @@ type ListServersResponse struct { Servers []*ServerInfo `json:"servers"` } -func newListServersHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { srvs, err := inspector.Servers() if err != nil { @@ -24,7 +24,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return } resp := ListServersResponse{ - Servers: toServerInfoList(srvs), + Servers: t.toServerInfoList(srvs), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/task_handlers.go b/task_handlers.go index fe78b17..5f11dd7 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -23,7 +23,7 @@ type ListActiveTasksResponse struct { Stats *QueueStateSnapshot `json:"stats"` } -func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -54,7 +54,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc } } } - activeTasks := toActiveTasks(tasks) + activeTasks := t.toActiveTasks(tasks) for _, t := range activeTasks { workerInfo, ok := m[t.ID] if ok { @@ -68,7 +68,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc resp := ListActiveTasksResponse{ Tasks: activeTasks, - Stats: toQueueStateSnapshot(qinfo), + Stats: t.toQueueStateSnapshot(qinfo), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -155,7 +155,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl } } -func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -176,9 +176,9 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc // avoid nil for the tasks field in json output. payload["tasks"] = make([]*PendingTask, 0) } else { - payload["tasks"] = toPendingTasks(tasks) + payload["tasks"] = t.toPendingTasks(tasks) } - payload["stats"] = toQueueStateSnapshot(qinfo) + payload["stats"] = t.toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -186,7 +186,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc } } -func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -207,9 +207,9 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ScheduledTask, 0) } else { - payload["tasks"] = toScheduledTasks(tasks) + payload["tasks"] = t.toScheduledTasks(tasks) } - payload["stats"] = toQueueStateSnapshot(qinfo) + payload["stats"] = t.toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -217,7 +217,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu } } -func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -238,9 +238,9 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { // avoid nil for the tasks field in json output. payload["tasks"] = make([]*RetryTask, 0) } else { - payload["tasks"] = toRetryTasks(tasks) + payload["tasks"] = t.toRetryTasks(tasks) } - payload["stats"] = toQueueStateSnapshot(qinfo) + payload["stats"] = t.toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -248,7 +248,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { } } -func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -269,9 +269,9 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFun // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ArchivedTask, 0) } else { - payload["tasks"] = toArchivedTasks(tasks) + payload["tasks"] = t.toArchivedTasks(tasks) } - payload["stats"] = toQueueStateSnapshot(qinfo) + payload["stats"] = t.toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -626,7 +626,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) { return pageSize, pageNum } -func newGetTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { +func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, taskid := vars["qname"], vars["task_id"] @@ -649,7 +649,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return } - if err := json.NewEncoder(w).Encode(toTaskInfo(info)); err != nil { + if err := json.NewEncoder(w).Encode(t.toTaskInfo(info)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }