From 8ff8ec57bc3029996d047d1be092dbebaddc8eb4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 17 Jul 2021 06:16:15 -0700 Subject: [PATCH] Add REST API endpoint to get task by ID --- conversion_helpers.go | 55 +++++++++++++++++++++++++++++++++++++++++++ main.go | 2 ++ task_handlers.go | 31 ++++++++++++++++++++++++ 3 files changed, 88 insertions(+) diff --git a/conversion_helpers.go b/conversion_helpers.go index 1208b1c..014502f 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -85,6 +85,61 @@ func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { return out } +type TaskInfo struct { + // ID is the identifier of the task. + ID string `json:"id"` + // Queue is the name of the queue in which the task belongs. + Queue string `json:"queue"` + // Type is the type name of the task. + Type string `json:"type"` + // Payload is the payload data of the task. + Payload string `json:"payload"` + // State indicates the task state. + State string `json:"state"` + // MaxRetry is the maximum number of times the task can be retried. + MaxRetry int `json:"max_retry"` + // Retried is the number of times the task has retried so far. + Retried int `json:"retried"` + // LastErr is the error message from the last failure. + LastErr string `json:"error_message"` + // LastFailedAt is the time time of the last failure in RFC3339 format. + // If the task has no failures, empty string. + LastFailedAt string `json:"last_failed_at"` + // Timeout is the number of seconds the task can be processed by Handler before being retried. + Timeout int `json:"timeout_seconds"` + // Deadline is the deadline for the task in RFC3339 format. If not set, empty string. + Deadline string `json:"deadline"` + // NextProcessAt is the time the task is scheduled to be processed in RFC3339 format. + // If not applicable, empty string. + NextProcessAt string `json:"next_process_at"` +} + +// formatTimeInRFC3339 formats t in RFC3339 if the value is non-zero. +// If t is zero time (i.e. time.Time{}), returns empty string +func formatTimeInRFC3339(t time.Time) string { + if t.IsZero() { + return "" + } + return t.Format(time.RFC3339) +} + +func toTaskInfo(info *asynq.TaskInfo) *TaskInfo { + return &TaskInfo{ + ID: info.ID, + Queue: info.Queue, + Type: info.Type, + Payload: toPrintablePayload(info.Payload), + State: info.State.String(), + MaxRetry: info.MaxRetry, + Retried: info.Retried, + LastErr: info.LastErr, + LastFailedAt: formatTimeInRFC3339(info.LastFailedAt), + Timeout: int(info.Timeout.Seconds()), + Deadline: formatTimeInRFC3339(info.Deadline), + NextProcessAt: formatTimeInRFC3339(info.NextProcessAt), + } +} + type BaseTask struct { ID string `json:"id"` Type string `json:"type"` diff --git a/main.go b/main.go index 82633d1..201f64e 100644 --- a/main.go +++ b/main.go @@ -197,6 +197,8 @@ func main() { api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector)).Methods("GET") + // Servers endpoints. api.HandleFunc("/servers", newListServersHandlerFunc(inspector)).Methods("GET") diff --git a/task_handlers.go b/task_handlers.go index 0339cb0..b0d0a29 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "errors" "log" "net/http" "strconv" @@ -623,3 +624,33 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) { } return pageSize, pageNum } + +func newGetTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname, taskid := vars["qname"], vars["task_id"] + if qname == "" { + http.Error(w, "queue name cannot be empty", http.StatusBadRequest) + return + } + if taskid == "" { + http.Error(w, "task_id cannot be empty", http.StatusBadRequest) + return + } + + info, err := inspector.GetTaskInfo(qname, taskid) + switch { + case errors.Is(err, asynq.ErrQueueNotFound), errors.Is(err, asynq.ErrTaskNotFound): + http.Error(w, err.Error(), http.StatusNotFound) + return + case err != nil: + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if err := json.NewEncoder(w).Encode(toTaskInfo(info)); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +}