From e569ad9186bf4d15274bdfa8bb2f31a69ac3bb3f Mon Sep 17 00:00:00 2001 From: ajatprabha Date: Sat, 2 Oct 2021 12:57:41 +0530 Subject: [PATCH] update BytesStringer => PayloadFormatter to pass taskType --- conversion_helpers.go | 100 ++++++++++++++++++------------------ handler.go | 26 +++++----- queue_handlers.go | 2 +- scheduler_entry_handlers.go | 4 +- server_handlers.go | 4 +- task_handlers.go | 24 ++++----- 6 files changed, 80 insertions(+), 80 deletions(-) diff --git a/conversion_helpers.go b/conversion_helpers.go index 3688c57..4b3d3a6 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -14,18 +14,18 @@ import ( // - conversion function from an external type to an internal type // **************************************************************************** -// BytesStringer can be used to convert payload bytes to string to show in web ui. -type BytesStringer interface { - String([]byte) string +// PayloadFormatter can be used to convert payload bytes to string to show in web ui. +type PayloadFormatter interface { + FormatPayload(taskType string, payload []byte) string } -type BytesStringerFunc func([]byte) string +type PayloadFormatterFunc func(string, []byte) string -func (f BytesStringerFunc) String(b []byte) string { - return f(b) +func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string { + return f(taskType, payload) } -var defaultBytesStringer = BytesStringerFunc(func(payload []byte) string { +var defaultPayloadFormatter = PayloadFormatterFunc(func(_ string, payload []byte) string { if !isPrintable(payload) { return "non-printable bytes" } @@ -158,12 +158,12 @@ func formatTimeInRFC3339(t time.Time) string { return t.Format(time.RFC3339) } -func toTaskInfo(info *asynq.TaskInfo, bs BytesStringer) *TaskInfo { +func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *TaskInfo { return &TaskInfo{ ID: info.ID, Queue: info.Queue, Type: info.Type, - Payload: bs.String(info.Payload), + Payload: pf.FormatPayload(info.Type, info.Payload), State: info.State.String(), MaxRetry: info.MaxRetry, Retried: info.Retried, @@ -202,11 +202,11 @@ type ActiveTask struct { Deadline string `json:"deadline"` } -func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask { +func toActiveTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ActiveTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: bs.String(ti.Payload), + Payload: pf.FormatPayload(ti.Type, ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -215,10 +215,10 @@ func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask { return &ActiveTask{BaseTask: base} } -func toActiveTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ActiveTask { +func toActiveTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ActiveTask { out := make([]*ActiveTask, len(in)) for i, ti := range in { - out[i] = toActiveTask(ti, bs) + out[i] = toActiveTask(ti, pf) } return out } @@ -228,11 +228,11 @@ type PendingTask struct { *BaseTask } -func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask { +func toPendingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *PendingTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: bs.String(ti.Payload), + Payload: pf.FormatPayload(ti.Type, ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -243,10 +243,10 @@ func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask { } } -func toPendingTasks(in []*asynq.TaskInfo, bs BytesStringer) []*PendingTask { +func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*PendingTask { out := make([]*PendingTask, len(in)) for i, ti := range in { - out[i] = toPendingTask(ti, bs) + out[i] = toPendingTask(ti, pf) } return out } @@ -256,11 +256,11 @@ type ScheduledTask struct { NextProcessAt time.Time `json:"next_process_at"` } -func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask { +func toScheduledTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ScheduledTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: bs.String(ti.Payload), + Payload: pf.FormatPayload(ti.Type, ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -272,10 +272,10 @@ func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask { } } -func toScheduledTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ScheduledTask { +func toScheduledTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ScheduledTask { out := make([]*ScheduledTask, len(in)) for i, ti := range in { - out[i] = toScheduledTask(ti, bs) + out[i] = toScheduledTask(ti, pf) } return out } @@ -285,11 +285,11 @@ type RetryTask struct { NextProcessAt time.Time `json:"next_process_at"` } -func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask { +func toRetryTask(ti *asynq.TaskInfo, pf PayloadFormatter) *RetryTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: bs.String(ti.Payload), + Payload: pf.FormatPayload(ti.Type, ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -301,10 +301,10 @@ func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask { } } -func toRetryTasks(in []*asynq.TaskInfo, bs BytesStringer) []*RetryTask { +func toRetryTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*RetryTask { out := make([]*RetryTask, len(in)) for i, ti := range in { - out[i] = toRetryTask(ti, bs) + out[i] = toRetryTask(ti, pf) } return out } @@ -314,11 +314,11 @@ type ArchivedTask struct { LastFailedAt time.Time `json:"last_failed_at"` } -func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask { +func toArchivedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ArchivedTask { base := &BaseTask{ ID: ti.ID, Type: ti.Type, - Payload: bs.String(ti.Payload), + Payload: pf.FormatPayload(ti.Type, ti.Payload), Queue: ti.Queue, MaxRetry: ti.MaxRetry, Retried: ti.Retried, @@ -330,10 +330,10 @@ func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask { } } -func toArchivedTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ArchivedTask { +func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ArchivedTask { out := make([]*ArchivedTask, len(in)) for i, ti := range in { - out[i] = toArchivedTask(ti, bs) + out[i] = toArchivedTask(ti, pf) } return out } @@ -349,7 +349,7 @@ type SchedulerEntry struct { PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"` } -func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry { +func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *SchedulerEntry { opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output for _, o := range e.Opts { opts = append(opts, o.String()) @@ -362,17 +362,17 @@ func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry ID: e.ID, Spec: e.Spec, TaskType: e.Task.Type(), - TaskPayload: bs.String(e.Task.Payload()), + TaskPayload: pf.FormatPayload(e.Task.Type(), e.Task.Payload()), Opts: opts, NextEnqueueAt: e.Next.Format(time.RFC3339), PrevEnqueueAt: prev, } } -func toSchedulerEntries(in []*asynq.SchedulerEntry, bs BytesStringer) []*SchedulerEntry { +func toSchedulerEntries(in []*asynq.SchedulerEntry, pf PayloadFormatter) []*SchedulerEntry { out := make([]*SchedulerEntry, len(in)) for i, e := range in { - out[i] = toSchedulerEntry(e, bs) + out[i] = toSchedulerEntry(e, pf) } return out } @@ -409,7 +409,7 @@ type ServerInfo struct { ActiveWorkers []*WorkerInfo `json:"active_workers"` } -func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo { +func toServerInfo(info *asynq.ServerInfo, pf PayloadFormatter) *ServerInfo { return &ServerInfo{ ID: info.ID, Host: info.Host, @@ -419,40 +419,40 @@ func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo { StrictPriority: info.StrictPriority, Started: info.Started.Format(time.RFC3339), Status: info.Status, - ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, bs), + ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, pf), } } -func toServerInfoList(in []*asynq.ServerInfo, bs BytesStringer) []*ServerInfo { +func toServerInfoList(in []*asynq.ServerInfo, pf PayloadFormatter) []*ServerInfo { out := make([]*ServerInfo, len(in)) for i, s := range in { - out[i] = toServerInfo(s, bs) + out[i] = toServerInfo(s, pf) } return out } type WorkerInfo struct { - TaskID string `json:"task_id"` - Queue string `json:"queue"` - TaskType string `json:"task_type"` - TakPayload string `json:"task_payload"` - Started string `json:"start_time"` + TaskID string `json:"task_id"` + Queue string `json:"queue"` + TaskType string `json:"task_type"` + TaskPayload string `json:"task_payload"` + Started string `json:"start_time"` } -func toWorkerInfo(info *asynq.WorkerInfo, bs BytesStringer) *WorkerInfo { +func toWorkerInfo(info *asynq.WorkerInfo, pf PayloadFormatter) *WorkerInfo { return &WorkerInfo{ - TaskID: info.TaskID, - Queue: info.Queue, - TaskType: info.TaskType, - TakPayload: bs.String(info.TaskPayload), - Started: info.Started.Format(time.RFC3339), + TaskID: info.TaskID, + Queue: info.Queue, + TaskType: info.TaskType, + TaskPayload: pf.FormatPayload(info.TaskType, info.TaskPayload), + Started: info.Started.Format(time.RFC3339), } } -func toWorkerInfoList(in []*asynq.WorkerInfo, bs BytesStringer) []*WorkerInfo { +func toWorkerInfoList(in []*asynq.WorkerInfo, pf PayloadFormatter) []*WorkerInfo { out := make([]*WorkerInfo, len(in)) for i, w := range in { - out[i] = toWorkerInfo(w, bs) + out[i] = toWorkerInfo(w, pf) } return out } diff --git a/handler.go b/handler.go index a4cd95d..48cfa8f 100644 --- a/handler.go +++ b/handler.go @@ -13,7 +13,7 @@ type HandlerOptions struct { RedisClient redis.UniversalClient Inspector *asynq.Inspector Middlewares []mux.MiddlewareFunc - BytesStringer BytesStringer + PayloadFormatter PayloadFormatter StaticContentHandler http.Handler } @@ -21,9 +21,9 @@ func NewHandler(opts HandlerOptions) http.Handler { router := mux.NewRouter() inspector := opts.Inspector - var bs BytesStringer = defaultBytesStringer - if opts.BytesStringer != nil { - bs = opts.BytesStringer + var pf PayloadFormatter = defaultPayloadFormatter + if opts.PayloadFormatter != nil { + pf = opts.PayloadFormatter } for _, mf := range opts.Middlewares { @@ -32,7 +32,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api := router.PathPrefix("/api").Subrouter() // Queue endpoints. - api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET") api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST") @@ -42,12 +42,12 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET") // Task endpoints. - api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -55,7 +55,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -66,7 +66,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -77,7 +77,7 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -85,13 +85,13 @@ func NewHandler(opts HandlerOptions) http.Handler { api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, pf)).Methods("GET") // Servers endpoints. - api.HandleFunc("/servers", newListServersHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/servers", newListServersHandlerFunc(inspector, pf)).Methods("GET") // Scheduler Entry endpoints. - api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, bs)).Methods("GET") + api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, pf)).Methods("GET") api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") // Redis info endpoint. diff --git a/queue_handlers.go b/queue_handlers.go index 3218cca..888213a 100644 --- a/queue_handlers.go +++ b/queue_handlers.go @@ -15,7 +15,7 @@ import ( // - http.Handler(s) for queue related endpoints // **************************************************************************** -func newListQueuesHandlerFunc(inspector *asynq.Inspector, t BytesStringer) http.HandlerFunc { +func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { qnames, err := inspector.Queues() if err != nil { diff --git a/scheduler_entry_handlers.go b/scheduler_entry_handlers.go index 287bb3b..b8c5931 100644 --- a/scheduler_entry_handlers.go +++ b/scheduler_entry_handlers.go @@ -14,7 +14,7 @@ import ( // - http.Handler(s) for scheduler entry related endpoints // **************************************************************************** -func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { entries, err := inspector.SchedulerEntries() if err != nil { @@ -26,7 +26,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, bs BytesStri // avoid nil for the entries field in json output. payload["entries"] = make([]*SchedulerEntry, 0) } else { - payload["entries"] = toSchedulerEntries(entries, bs) + payload["entries"] = toSchedulerEntries(entries, pf) } if err := json.NewEncoder(w).Encode(payload); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server_handlers.go b/server_handlers.go index 680d8b4..34267ea 100644 --- a/server_handlers.go +++ b/server_handlers.go @@ -16,7 +16,7 @@ type ListServersResponse struct { Servers []*ServerInfo `json:"servers"` } -func newListServersHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListServersHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { srvs, err := inspector.Servers() if err != nil { @@ -24,7 +24,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) htt return } resp := ListServersResponse{ - Servers: toServerInfoList(srvs, bs), + Servers: toServerInfoList(srvs, pf), } if err := json.NewEncoder(w).Encode(resp); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/task_handlers.go b/task_handlers.go index d0eaf4f..a9c9934 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -24,7 +24,7 @@ type ListActiveTasksResponse struct { Stats *QueueStateSnapshot `json:"stats"` } -func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -55,7 +55,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) } } } - activeTasks := toActiveTasks(tasks, bs) + activeTasks := toActiveTasks(tasks, pf) for _, t := range activeTasks { workerInfo, ok := m[t.ID] if ok { @@ -156,7 +156,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl } } -func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -177,7 +177,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer // avoid nil for the tasks field in json output. payload["tasks"] = make([]*PendingTask, 0) } else { - payload["tasks"] = toPendingTasks(tasks, bs) + payload["tasks"] = toPendingTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { @@ -187,7 +187,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer } } -func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -208,7 +208,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, bs BytesString // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ScheduledTask, 0) } else { - payload["tasks"] = toScheduledTasks(tasks, bs) + payload["tasks"] = toScheduledTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { @@ -218,7 +218,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, bs BytesString } } -func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -239,7 +239,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) // avoid nil for the tasks field in json output. payload["tasks"] = make([]*RetryTask, 0) } else { - payload["tasks"] = toRetryTasks(tasks, bs) + payload["tasks"] = toRetryTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { @@ -249,7 +249,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) } } -func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -270,7 +270,7 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringe // avoid nil for the tasks field in json output. payload["tasks"] = make([]*ArchivedTask, 0) } else { - payload["tasks"] = toArchivedTasks(tasks, bs) + payload["tasks"] = toArchivedTasks(tasks, pf) } payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { @@ -627,7 +627,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) { return pageSize, pageNum } -func newGetTaskHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc { +func newGetTaskHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, taskid := vars["qname"], vars["task_id"] @@ -650,7 +650,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.Ha return } - if err := json.NewEncoder(w).Encode(toTaskInfo(info, bs)); err != nil { + if err := json.NewEncoder(w).Encode(toTaskInfo(info, pf)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }