From e635b73e6c6918b70a4297715632d32fcb1785e1 Mon Sep 17 00:00:00 2001 From: ajatprabha Date: Fri, 1 Oct 2021 00:19:41 +0530 Subject: [PATCH] remove type transformer --- conversion_helpers.go | 86 ++++++++++++++++++------------------- handler.go | 33 +++++++------- queue_handlers.go | 16 +++---- scheduler_entry_handlers.go | 8 ++-- server_handlers.go | 4 +- task_handlers.go | 34 +++++++-------- 6 files changed, 89 insertions(+), 92 deletions(-) diff --git a/conversion_helpers.go b/conversion_helpers.go index 0894ea6..3688c57 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -49,10 +49,6 @@ func isPrintable(data []byte) bool { return !isAllSpace } -type transformer struct { - bs BytesStringer -} - type QueueStateSnapshot struct { // Name of the queue. Queue string `json:"queue"` @@ -80,7 +76,7 @@ type QueueStateSnapshot struct { Timestamp time.Time `json:"timestamp"` } -func (t *transformer) toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { +func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { return &QueueStateSnapshot{ Queue: s.Queue, MemoryUsage: s.MemoryUsage, @@ -106,7 +102,7 @@ type DailyStats struct { Date string `json:"date"` } -func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats { +func toDailyStats(s *asynq.DailyStats) *DailyStats { return &DailyStats{ Queue: s.Queue, Processed: s.Processed, @@ -116,10 +112,10 @@ func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats { } } -func (t *transformer) toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { +func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { out := make([]*DailyStats, len(in)) for i, s := range in { - out[i] = t.toDailyStats(s) + out[i] = toDailyStats(s) } return out } @@ -162,12 +158,12 @@ func formatTimeInRFC3339(t time.Time) string { return t.Format(time.RFC3339) } -func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo { +func toTaskInfo(info *asynq.TaskInfo, bs BytesStringer) *TaskInfo { return &TaskInfo{ ID: info.ID, Queue: info.Queue, Type: info.Type, - Payload: t.bs.String(info.Payload), + Payload: bs.String(info.Payload), State: info.State.String(), MaxRetry: info.MaxRetry, Retried: info.Retried, @@ -206,11 +202,11 @@ type ActiveTask struct { Deadline string `json:"deadline"` } -func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask { +func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: t.bs.String(ti.Payload), + Payload: bs.String(ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -219,10 +215,10 @@ func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask { return &ActiveTask{BaseTask: base} } -func (t *transformer) toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask { +func toActiveTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ActiveTask { out := make([]*ActiveTask, len(in)) for i, ti := range in { - out[i] = t.toActiveTask(ti) + out[i] = toActiveTask(ti, bs) } return out } @@ -232,11 +228,11 @@ type PendingTask struct { *BaseTask } -func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask { +func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: t.bs.String(ti.Payload), + Payload: bs.String(ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -247,10 +243,10 @@ func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask { } } -func (t *transformer) toPendingTasks(in []*asynq.TaskInfo) []*PendingTask { +func toPendingTasks(in []*asynq.TaskInfo, bs BytesStringer) []*PendingTask { out := make([]*PendingTask, len(in)) for i, ti := range in { - out[i] = t.toPendingTask(ti) + out[i] = toPendingTask(ti, bs) } return out } @@ -260,11 +256,11 @@ type ScheduledTask struct { NextProcessAt time.Time `json:"next_process_at"` } -func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask { +func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: t.bs.String(ti.Payload), + Payload: bs.String(ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -276,10 +272,10 @@ func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask { } } -func (t *transformer) toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask { +func toScheduledTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ScheduledTask { out := make([]*ScheduledTask, len(in)) for i, ti := range in { - out[i] = t.toScheduledTask(ti) + out[i] = toScheduledTask(ti, bs) } return out } @@ -289,11 +285,11 @@ type RetryTask struct { NextProcessAt time.Time `json:"next_process_at"` } -func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask { +func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: t.bs.String(ti.Payload), + Payload: bs.String(ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -305,10 +301,10 @@ func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask { } } -func (t *transformer) toRetryTasks(in []*asynq.TaskInfo) []*RetryTask { +func toRetryTasks(in []*asynq.TaskInfo, bs BytesStringer) []*RetryTask { out := make([]*RetryTask, len(in)) for i, ti := range in { - out[i] = t.toRetryTask(ti) + out[i] = toRetryTask(ti, bs) } return out } @@ -318,11 +314,11 @@ type ArchivedTask struct { LastFailedAt time.Time `json:"last_failed_at"` } -func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask { +func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: t.bs.String(ti.Payload), + Payload: bs.String(ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -334,10 +330,10 @@ func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask { } } -func (t *transformer) toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask { +func toArchivedTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ArchivedTask { out := make([]*ArchivedTask, len(in)) for i, ti := range in { - out[i] = t.toArchivedTask(ti) + out[i] = toArchivedTask(ti, bs) } return out } @@ -353,7 +349,7 @@ type SchedulerEntry struct { PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"` } -func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { +func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry { opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output for _, o := range e.Opts { opts = append(opts, o.String()) @@ -366,17 +362,17 @@ func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry ID: e.ID, Spec: e.Spec, TaskType: e.Task.Type(), - TaskPayload: t.bs.String(e.Task.Payload()), + TaskPayload: bs.String(e.Task.Payload()), Opts: opts, NextEnqueueAt: e.Next.Format(time.RFC3339), PrevEnqueueAt: prev, } } -func (t *transformer) toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { +func toSchedulerEntries(in []*asynq.SchedulerEntry, bs BytesStringer) []*SchedulerEntry { out := make([]*SchedulerEntry, len(in)) for i, e := range in { - out[i] = t.toSchedulerEntry(e) + out[i] = toSchedulerEntry(e, bs) } return out } @@ -386,17 +382,17 @@ type SchedulerEnqueueEvent struct { EnqueuedAt string `json:"enqueued_at"` } -func (t *transformer) toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { +func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { return &SchedulerEnqueueEvent{ TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339), } } -func (t *transformer) toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { +func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { out := make([]*SchedulerEnqueueEvent, len(in)) for i, e := range in { - out[i] = t.toSchedulerEnqueueEvent(e) + out[i] = toSchedulerEnqueueEvent(e) } return out } @@ -413,7 +409,7 @@ type ServerInfo struct { ActiveWorkers []*WorkerInfo `json:"active_workers"` } -func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo { +func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo { return &ServerInfo{ ID: info.ID, Host: info.Host, @@ -423,14 +419,14 @@ func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo { StrictPriority: info.StrictPriority, Started: info.Started.Format(time.RFC3339), Status: info.Status, - ActiveWorkers: t.toWorkerInfoList(info.ActiveWorkers), + ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, bs), } } -func (t *transformer) toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo { +func toServerInfoList(in []*asynq.ServerInfo, bs BytesStringer) []*ServerInfo { out := make([]*ServerInfo, len(in)) for i, s := range in { - out[i] = t.toServerInfo(s) + out[i] = toServerInfo(s, bs) } return out } @@ -443,20 +439,20 @@ type WorkerInfo struct { Started string `json:"start_time"` } -func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo { +func toWorkerInfo(info *asynq.WorkerInfo, bs BytesStringer) *WorkerInfo { return &WorkerInfo{ TaskID: info.TaskID, Queue: info.Queue, TaskType: info.TaskType, - TakPayload: t.bs.String(info.TaskPayload), + TakPayload: bs.String(info.TaskPayload), Started: info.Started.Format(time.RFC3339), } } -func (t *transformer) toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo { +func toWorkerInfoList(in []*asynq.WorkerInfo, bs BytesStringer) []*WorkerInfo { out := make([]*WorkerInfo, len(in)) for i, w := range in { - out[i] = t.toWorkerInfo(w) + out[i] = toWorkerInfo(w, bs) } return out } diff --git a/handler.go b/handler.go index b142e16..a4cd95d 100644 --- a/handler.go +++ b/handler.go @@ -20,9 +20,10 @@ type HandlerOptions struct { func NewHandler(opts HandlerOptions) http.Handler { router := mux.NewRouter() inspector := opts.Inspector - t := &transformer{bs: defaultBytesStringer} + + var bs BytesStringer = defaultBytesStringer if opts.BytesStringer != nil { - t = &transformer{bs: opts.BytesStringer} + bs = opts.BytesStringer } for _, mf := range opts.Middlewares { @@ -31,22 +32,22 @@ func NewHandler(opts HandlerOptions) http.Handler { api := router.PathPrefix("/api").Subrouter() // Queue endpoints. - api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, t)).Methods("GET") - api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector, t)).Methods("GET") - api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector, t)).Methods("DELETE") + api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET") + api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST") // Queue Historical Stats endpoint. - api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET") // Task endpoints. - api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, bs)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, bs)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -54,7 +55,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, bs)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -65,7 +66,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, bs)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -76,7 +77,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, bs)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -84,14 +85,14 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, bs)).Methods("GET") // Servers endpoints. - api.HandleFunc("/servers", newListServersHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/servers", newListServersHandlerFunc(inspector, bs)).Methods("GET") // Scheduler Entry endpoints. - api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, t)).Methods("GET") - api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector, t)).Methods("GET") + api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") // Redis info endpoint. switch c := opts.RedisClient.(type) { @@ -101,7 +102,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(c)).Methods("GET") } - api.Handle("/", opts.StaticContentHandler) + router.PathPrefix("/").Handler(opts.StaticContentHandler) return router } diff --git a/queue_handlers.go b/queue_handlers.go index 0bfd4a8..3218cca 100644 --- a/queue_handlers.go +++ b/queue_handlers.go @@ -15,7 +15,7 @@ import ( // - http.Handler(s) for queue related endpoints // **************************************************************************** -func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListQueuesHandlerFunc(inspector *asynq.Inspector, t BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { @@ -29,14 +29,14 @@ func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.H http.Error(w, err.Error(), http.StatusInternalServerError) return } - snapshots[i] = t.toQueueStateSnapshot(qinfo) + snapshots[i] = toQueueStateSnapshot(qinfo) } payload := map[string]interface{}{"queues": snapshots} json.NewEncoder(w).Encode(payload) } } -func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -48,7 +48,7 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Han http.Error(w, err.Error(), http.StatusInternalServerError) return } - payload["current"] = t.toQueueStateSnapshot(qinfo) + payload["current"] = toQueueStateSnapshot(qinfo) // TODO: make this n a variable data, err := inspector.History(qname, 10) @@ -58,14 +58,14 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Han } var dailyStats []*DailyStats for _, s := range data { - dailyStats = append(dailyStats, t.toDailyStats(s)) + dailyStats = append(dailyStats, toDailyStats(s)) } payload["history"] = dailyStats json.NewEncoder(w).Encode(payload) } } -func newDeleteQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newDeleteQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -113,7 +113,7 @@ type ListQueueStatsResponse struct { Stats map[string][]*DailyStats `json:"stats"` } -func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { @@ -128,7 +128,7 @@ func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) ht http.Error(w, err.Error(), http.StatusInternalServerError) return } - resp.Stats[qname] = t.toDailyStatsList(stats) + resp.Stats[qname] = toDailyStatsList(stats) } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/scheduler_entry_handlers.go b/scheduler_entry_handlers.go index 31e8b83..287bb3b 100644 --- a/scheduler_entry_handlers.go +++ b/scheduler_entry_handlers.go @@ -14,7 +14,7 @@ import ( // - http.Handler(s) for scheduler entry related endpoints // **************************************************************************** -func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { entries, err := inspector.SchedulerEntries() if err != nil { @@ -26,7 +26,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transform // avoid nil for the entries field in json output. payload["entries"] = make([]*SchedulerEntry, 0) } else { - payload["entries"] = t.toSchedulerEntries(entries) + payload["entries"] = toSchedulerEntries(entries, bs) } if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -39,7 +39,7 @@ type ListSchedulerEnqueueEventsResponse struct { Events []*SchedulerEnqueueEvent `json:"events"` } -func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { entryID := mux.Vars(r)["entry_id"] pageSize, pageNum := getPageOptions(r) @@ -50,7 +50,7 @@ func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *tra return } resp := ListSchedulerEnqueueEventsResponse{ - Events: t.toSchedulerEnqueueEvents(events), + Events: toSchedulerEnqueueEvents(events), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server_handlers.go b/server_handlers.go index 72a11aa..680d8b4 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -16,7 +16,7 @@ type ListServersResponse struct { Servers []*ServerInfo `json:"servers"` } -func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListServersHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { srvs, err := inspector.Servers() if err != nil { @@ -24,7 +24,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http. return } resp := ListServersResponse{ - Servers: t.toServerInfoList(srvs), + Servers: toServerInfoList(srvs, bs), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/task_handlers.go b/task_handlers.go index f8e19da..d0eaf4f 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -24,7 +24,7 @@ type ListActiveTasksResponse struct { Stats *QueueStateSnapshot `json:"stats"` } -func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -55,7 +55,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) h } } } - activeTasks := t.toActiveTasks(tasks) + activeTasks := toActiveTasks(tasks, bs) for _, t := range activeTasks { workerInfo, ok := m[t.ID] if ok { @@ -69,7 +69,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) h resp := ListActiveTasksResponse{ Tasks: activeTasks, - Stats: t.toQueueStateSnapshot(qinfo), + Stats: toQueueStateSnapshot(qinfo), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -156,7 +156,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl } } -func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -177,9 +177,9 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) // avoid nil for the tasks field in json output. payload["tasks"] = make([]*PendingTask, 0) } else { - payload["tasks"] = t.toPendingTasks(tasks) + payload["tasks"] = toPendingTasks(tasks, bs) } - payload["stats"] = t.toQueueStateSnapshot(qinfo) + payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -187,7 +187,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) } } -func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -208,9 +208,9 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ScheduledTask, 0) } else { - payload["tasks"] = t.toScheduledTasks(tasks) + payload["tasks"] = toScheduledTasks(tasks, bs) } - payload["stats"] = t.toQueueStateSnapshot(qinfo) + payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -218,7 +218,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer } } -func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -239,9 +239,9 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) ht // avoid nil for the tasks field in json output. payload["tasks"] = make([]*RetryTask, 0) } else { - payload["tasks"] = t.toRetryTasks(tasks) + payload["tasks"] = toRetryTasks(tasks, bs) } - payload["stats"] = t.toQueueStateSnapshot(qinfo) + payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -249,7 +249,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) ht } } -func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -270,9 +270,9 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ArchivedTask, 0) } else { - payload["tasks"] = t.toArchivedTasks(tasks) + payload["tasks"] = toArchivedTasks(tasks, bs) } - payload["stats"] = t.toQueueStateSnapshot(qinfo) + payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -627,7 +627,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) { return pageSize, pageNum } -func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { +func newGetTaskHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, taskid := vars["qname"], vars["task_id"] @@ -650,7 +650,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Hand return } - if err := json.NewEncoder(w).Encode(t.toTaskInfo(info)); err != nil { + if err := json.NewEncoder(w).Encode(toTaskInfo(info, bs)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }