diff --git a/conversion_helpers.go b/conversion_helpers.go index 5870f1c..5efa4e1 100644 --- a/conversion_helpers.go +++ b/conversion_helpers.go @@ -313,6 +313,35 @@ func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*pendingTask { return out } +type aggregatingTask struct { + *baseTask + Group string `json:"group"` +} + +func toAggregatingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *aggregatingTask { + base := &baseTask{ + ID: ti.ID, + Type: ti.Type, + Payload: pf.FormatPayload(ti.Type, ti.Payload), + Queue: ti.Queue, + MaxRetry: ti.MaxRetry, + Retried: ti.Retried, + LastError: ti.LastErr, + } + return &aggregatingTask{ + baseTask: base, + Group: ti.Group, + } +} + +func toAggregatingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*aggregatingTask { + out := make([]*aggregatingTask, len(in)) + for i, ti := range in { + out[i] = toAggregatingTask(ti, pf) + } + return out +} + type scheduledTask struct { *baseTask NextProcessAt time.Time `json:"next_process_at"` diff --git a/handler.go b/handler.go index 2a20c15..682419a 100644 --- a/handler.go +++ b/handler.go @@ -174,6 +174,8 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto api.HandleFunc("/queues/{qname}/completed_tasks:delete_all", newDeleteAllCompletedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") + api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks", newListAggregatingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") + api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") // Groups endponts diff --git a/task_handlers.go b/task_handlers.go index 408a565..c309854 100644 --- a/task_handlers.go +++ b/task_handlers.go @@ -310,6 +310,38 @@ func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm } } +func newListAggregatingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + qname := vars["qname"] + gname := vars["gname"] + pageSize, pageNum := getPageOptions(r) + tasks, err := inspector.ListAggregatingTasks( + qname, gname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + qinfo, err := inspector.GetQueueInfo(qname) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + payload := make(map[string]interface{}) + if len(tasks) == 0 { + // avoid nil for the tasks field in json output. + payload["tasks"] = make([]*aggregatingTask, 0) + } else { + payload["tasks"] = toAggregatingTasks(tasks, pf) + } + payload["stats"] = toQueueStateSnapshot(qinfo) + if err := json.NewEncoder(w).Encode(payload); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r)