From e724088a9c14d9c8fd5218d2f8428107d259dcba Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 22 Oct 2021 10:13:46 -0700 Subject: [PATCH] Add ResultFormatter --- conversion_helpers.go | 41 ++++++++++++++++++++++++++++++----------- handler.go | 32 +++++++++++++++++++++----------- task_handlers.go | 8 ++++---- 3 files changed, 55 insertions(+), 26 deletions(-) diff --git a/conversion_helpers.go b/conversion_helpers.go index d3114de..74e7969 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -14,7 +14,7 @@ import ( // - conversion function from an external type to an internal type // **************************************************************************** -// PayloadFormatter is used to convert payload bytes to string shown in the UI. +// PayloadFormatter is used to convert payload bytes to a string shown in the UI. type PayloadFormatter interface { // FormatPayload takes the task's typename and payload and returns a string representation of the payload. FormatPayload(taskType string, payload []byte) string @@ -22,11 +22,22 @@ type PayloadFormatter interface { type PayloadFormatterFunc func(string, []byte) string -// FormatPayload returns a string representation of the payload of the given taskType. func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string { return f(taskType, payload) } +// ResultFormatter is used to convert result bytes to a string shown in the UI. +type ResultFormatter interface { + // FormatResult takes the task's typename and result and returns a string representation of the result. + FormatResult(taskType string, result []byte) string +} + +type ResultFormatterFunc func(string, []byte) string + +func (f ResultFormatterFunc) FormatResult(taskType string, result []byte) string { + return f(taskType, result) +} + // DefaultPayloadFormatter is the PayloadFormater used by default. // It prints the given payload bytes as is if the bytes are printable, otherwise it prints a message to indicate // that the bytes are not printable. @@ -37,6 +48,16 @@ var DefaultPayloadFormatter = PayloadFormatterFunc(func(_ string, payload []byte return string(payload) }) +// DefaultResultFormatter is the ResultFormatter used by default. +// It prints the given result bytes as is if the bytes are printable, otherwise it prints a message to indicate +// that the bytes are not printable. +var DefaultResultFormatter = ResultFormatterFunc(func(_ string, result []byte) string { + if !isPrintable(result) { + return "non-printable bytes" + } + return string(result) +}) + // isPrintable reports whether the given data is comprised of all printable runes. func isPrintable(data []byte) bool { if !utf8.Valid(data) { @@ -181,7 +202,7 @@ func formatTimeInRFC3339(t time.Time) string { return t.Format(time.RFC3339) } -func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo { +func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *taskInfo { return &taskInfo{ ID: info.ID, Queue: info.Queue, @@ -196,9 +217,8 @@ func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo { Deadline: formatTimeInRFC3339(info.Deadline), NextProcessAt: formatTimeInRFC3339(info.NextProcessAt), CompletedAt: formatTimeInRFC3339(info.CompletedAt), - // TODO: Replace this with resultFormatter - Result: DefaultPayloadFormatter.FormatPayload("", info.Result), - TTL: int64(taskTTL(info).Seconds()), + Result: rf.FormatResult("", info.Result), + TTL: int64(taskTTL(info).Seconds()), } } @@ -373,7 +393,7 @@ type completedTask struct { TTL int64 `json:"ttl_seconds"` } -func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *completedTask { +func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *completedTask { base := &baseTask{ ID: ti.ID, Type: ti.Type, @@ -387,15 +407,14 @@ func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *completedTask { baseTask: base, CompletedAt: ti.CompletedAt, TTL: int64(taskTTL(ti).Seconds()), - // TODO: Use resultFormatter instead - Result: DefaultPayloadFormatter.FormatPayload("", ti.Result), + Result: rf.FormatResult(ti.Type, ti.Result), } } -func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*completedTask { +func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) []*completedTask { out := make([]*completedTask, len(in)) for i, ti := range in { - out[i] = toCompletedTask(ti, pf) + out[i] = toCompletedTask(ti, pf, rf) } return out } diff --git a/handler.go b/handler.go index a053a0e..a14f4d6 100644 --- a/handler.go +++ b/handler.go @@ -29,6 +29,11 @@ type Options struct { // // This field is optional. PayloadFormatter PayloadFormatter + + // ResultFormatter is used to convert result bytes to string shown in the UI. + // + // This field is optional. + ResultFormatter ResultFormatter } // HTTPHandler is a http.Handler for asynqmon application. @@ -89,9 +94,14 @@ var staticContents embed.FS func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspector) *mux.Router { router := mux.NewRouter().PathPrefix(opts.RootPath).Subrouter() - var pf PayloadFormatter = DefaultPayloadFormatter + var payloadFmt PayloadFormatter = DefaultPayloadFormatter if opts.PayloadFormatter != nil { - pf = opts.PayloadFormatter + payloadFmt = opts.PayloadFormatter + } + + var resultFmt ResultFormatter = DefaultResultFormatter + if opts.ResultFormatter != nil { + resultFmt = opts.ResultFormatter } api := router.PathPrefix("/api").Subrouter() @@ -106,12 +116,12 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET") // Task endpoints. - api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -119,7 +129,7 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -130,7 +140,7 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -141,7 +151,7 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") @@ -149,18 +159,18 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/completed_tasks", newListCompletedTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/completed_tasks", newListCompletedTasksHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") api.HandleFunc("/queues/{qname}/completed_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/completed_tasks:delete_all", newDeleteAllCompletedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") - api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") // Servers endpoints. - api.HandleFunc("/servers", newListServersHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/servers", newListServersHandlerFunc(inspector, payloadFmt)).Methods("GET") // Scheduler Entry endpoints. - api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, payloadFmt)).Methods("GET") api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET") // Redis info endpoint. diff --git a/task_handlers.go b/task_handlers.go index 87535d5..408a565 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -280,7 +280,7 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForma } } -func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { +func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter, rf ResultFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname := vars["qname"] @@ -300,7 +300,7 @@ func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm // avoid nil for the tasks field in json output. payload["tasks"] = make([]*completedTask, 0) } else { - payload["tasks"] = toCompletedTasks(tasks, pf) + payload["tasks"] = toCompletedTasks(tasks, pf, rf) } payload["stats"] = toQueueStateSnapshot(qinfo) if err := json.NewEncoder(w).Encode(payload); err != nil { @@ -673,7 +673,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) { return pageSize, pageNum } -func newGetTaskHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { +func newGetTaskHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter, rf ResultFormatter) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) qname, taskid := vars["qname"], vars["task_id"] @@ -696,7 +696,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http return } - if err := json.NewEncoder(w).Encode(toTaskInfo(info, pf)); err != nil { + if err := json.NewEncoder(w).Encode(toTaskInfo(info, pf, rf)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return }