mirror of
				https://github.com/hibiken/asynqmon.git
				synced 2025-10-26 16:26:12 +08:00 
			
		
		
		
	Add API endpoints to batch cancel active tasks
This commit is contained in:
		
							
								
								
									
										3
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								main.go
									
									
									
									
									
								
							| @@ -78,6 +78,9 @@ func main() { | ||||
| 	// Task endpoints. | ||||
| 	api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector)).Methods("GET") | ||||
| 	api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") | ||||
| 	api.HandleFunc("/queues/{qname}/active_tasks/:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") | ||||
| 	api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") | ||||
| 	api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") | ||||
| 	api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET") | ||||
| 	api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET") | ||||
| 	api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_key}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") | ||||
|   | ||||
| @@ -57,6 +57,73 @@ func newCancelActiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newCancelAllActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		const batchSize = 100 | ||||
| 		page := 1 | ||||
| 		qname := mux.Vars(r)["qname"] | ||||
| 		for { | ||||
| 			tasks, err := inspector.ListActiveTasks(qname, asynq.Page(page), asynq.PageSize(batchSize)) | ||||
| 			if err != nil { | ||||
| 				http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 				return | ||||
| 			} | ||||
| 			for _, t := range tasks { | ||||
| 				if err := inspector.CancelActiveTask(t.ID); err != nil { | ||||
| 					http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 					return | ||||
| 				} | ||||
| 			} | ||||
| 			if len(tasks) < batchSize { | ||||
| 				break | ||||
| 			} | ||||
| 			page++ | ||||
| 		} | ||||
| 		w.WriteHeader(http.StatusNoContent) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type batchCancelTasksRequest struct { | ||||
| 	TaskIDs []string `json:"task_ids"` | ||||
| } | ||||
|  | ||||
| type batchCancelTasksResponse struct { | ||||
| 	CanceledIDs []string `json:"canceled_ids"` | ||||
| 	ErrorIDs    []string `json:"error_ids"` | ||||
| } | ||||
|  | ||||
| func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize) | ||||
| 		dec := json.NewDecoder(r.Body) | ||||
| 		dec.DisallowUnknownFields() | ||||
|  | ||||
| 		var req batchCancelTasksRequest | ||||
| 		if err := dec.Decode(&req); err != nil { | ||||
| 			http.Error(w, err.Error(), http.StatusBadRequest) | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		resp := batchCancelTasksResponse{ | ||||
| 			// avoid null in the json response | ||||
| 			CanceledIDs: make([]string, 0), | ||||
| 			ErrorIDs:    make([]string, 0), | ||||
| 		} | ||||
| 		for _, id := range req.TaskIDs { | ||||
| 			if err := inspector.CancelActiveTask(id); err != nil { | ||||
| 				log.Printf("error: could not send cancelation signal to task %s", id) | ||||
| 				resp.ErrorIDs = append(resp.ErrorIDs, id) | ||||
| 			} else { | ||||
| 				resp.CanceledIDs = append(resp.CanceledIDs, id) | ||||
| 			} | ||||
| 		} | ||||
| 		if err := json.NewEncoder(w).Encode(resp); err != nil { | ||||
| 			http.Error(w, err.Error(), http.StatusInternalServerError) | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { | ||||
| 	return func(w http.ResponseWriter, r *http.Request) { | ||||
| 		vars := mux.Vars(r) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user