diff --git a/conversion_helpers.go b/conversion_helpers.go index c106330..ea01249 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -343,6 +343,35 @@ func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*archivedTask return out } +type completedTask struct { + *baseTask + CompletedAt time.Time `json:"completed_at"` +} + +func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *completedTask { + base := &baseTask{ + ID: ti.ID, + Type: ti.Type, + Payload: pf.FormatPayload(ti.Type, ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, + } + return &completedTask{ + baseTask: base, + CompletedAt: ti.CompletedAt, + } +} + +func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*completedTask { + out := make([]*completedTask, len(in)) + for i, ti := range in { + out[i] = toCompletedTask(ti, pf) + } + return out +} + type schedulerEntry struct { ID string `json:"id"` Spec string `json:"spec"` diff --git a/handler.go b/handler.go index 0eb7cf6..5d90a7c 100644 --- a/handler.go +++ b/handler.go @@ -149,6 +149,8 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/completed_tasks", newListCompletedTasksHandlerFunc(inspector, pf)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, pf)).Methods("GET") // Servers endpoints. diff --git a/task_handlers.go b/task_handlers.go index a3c7439..0069ecf 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -280,6 +280,36 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForma } } +func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname := vars["qname"] + pageSize, pageNum := getPageOptions(r) + tasks, err := inspector.ListCompletedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + qinfo, err := inspector.GetQueueInfo(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + payload := make(map[string]interface{}) + if len(tasks) == 0 { + // avoid nil for the tasks field in json output. + payload["tasks"] = make([]*completedTask, 0) + } else { + payload["tasks"] = toCompletedTasks(tasks, pf) + } + payload["stats"] = toQueueStateSnapshot(qinfo) + if err := json.NewEncoder(w).Encode(payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r)