mirror of
				https://github.com/hibiken/asynqmon.git
				synced 2025-10-26 16:26:12 +08:00 
			
		
		
		
	Add REST endpoint for listing aggregating tasks
This commit is contained in:
		| @@ -313,6 +313,35 @@ func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*pendingTask { | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| type aggregatingTask struct { | ||||
| 	*baseTask | ||||
| 	Group string `json:"group"` | ||||
| } | ||||
|  | ||||
| func toAggregatingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *aggregatingTask { | ||||
| 	base := &baseTask{ | ||||
| 		ID:        ti.ID, | ||||
| 		Type:      ti.Type, | ||||
| 		Payload:   pf.FormatPayload(ti.Type, ti.Payload), | ||||
| 		Queue:     ti.Queue, | ||||
| 		MaxRetry:  ti.MaxRetry, | ||||
| 		Retried:   ti.Retried, | ||||
| 		LastError: ti.LastErr, | ||||
| 	} | ||||
| 	return &aggregatingTask{ | ||||
| 		baseTask: base, | ||||
| 		Group:    ti.Group, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func toAggregatingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*aggregatingTask { | ||||
| 	out := make([]*aggregatingTask, len(in)) | ||||
| 	for i, ti := range in { | ||||
| 		out[i] = toAggregatingTask(ti, pf) | ||||
| 	} | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| type scheduledTask struct { | ||||
| 	*baseTask | ||||
| 	NextProcessAt time.Time `json:"next_process_at"` | ||||
|   | ||||
| @@ -174,6 +174,8 @@ func muxRouter(opts Options, rc redis.UniversalClient, inspector *asynq.Inspecto | ||||
| 	api.HandleFunc("/queues/{qname}/completed_tasks:delete_all", newDeleteAllCompletedTasksHandlerFunc(inspector)).Methods("DELETE") | ||||
| 	api.HandleFunc("/queues/{qname}/completed_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") | ||||
|  | ||||
| 	api.HandleFunc("/queues/{qname}/groups/{gname}/aggregating_tasks", newListAggregatingTasksHandlerFunc(inspector, payloadFmt)).Methods("GET") | ||||
|  | ||||
| 	api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, payloadFmt, resultFmt)).Methods("GET") | ||||
|  | ||||
| 	// Groups endponts | ||||
|   | ||||
| @@ -310,6 +310,38 @@ func newListCompletedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newListAggregatingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		vars := mux.Vars(r) | ||||
| 		qname := vars["qname"] | ||||
| 		gname := vars["gname"] | ||||
| 		pageSize, pageNum := getPageOptions(r) | ||||
| 		tasks, err := inspector.ListAggregatingTasks( | ||||
| 			qname, gname, asynq.PageSize(pageSize), asynq.Page(pageNum)) | ||||
| 		if err != nil { | ||||
| 			http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 		qinfo, err := inspector.GetQueueInfo(qname) | ||||
| 		if err != nil { | ||||
| 			http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 		payload := make(map[string]interface{}) | ||||
| 		if len(tasks) == 0 { | ||||
| 			// avoid nil for the tasks field in json output. | ||||
| 			payload["tasks"] = make([]*aggregatingTask, 0) | ||||
| 		} else { | ||||
| 			payload["tasks"] = toAggregatingTasks(tasks, pf) | ||||
| 		} | ||||
| 		payload["stats"] = toQueueStateSnapshot(qinfo) | ||||
| 		if err := json.NewEncoder(w).Encode(payload); err != nil { | ||||
| 			http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		vars := mux.Vars(r) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user