Update code to use inspeq package

This commit is contained in:
Ken Hibino
2021-01-28 19:47:58 -08:00
parent 4179d00b7b
commit 0626daab0b
7 changed files with 74 additions and 70 deletions

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/gorilla/mux"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
)
// ****************************************************************************
@@ -21,14 +21,14 @@ type ListActiveTasksResponse struct {
Stats *QueueStateSnapshot `json:"stats"`
}
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newListActiveTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname := vars["qname"]
pageSize, pageNum := getPageOptions(r)
tasks, err := inspector.ListActiveTasks(
qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -44,7 +44,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
return
}
// m maps taskID to WorkerInfo.
m := make(map[string]*asynq.WorkerInfo)
m := make(map[string]*inspeq.WorkerInfo)
for _, srv := range servers {
for _, w := range srv.ActiveWorkers {
if w.Task.Queue == qname {
@@ -75,7 +75,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
}
}
func newCancelActiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newCancelActiveTaskHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["task_id"]
if err := inspector.CancelActiveTask(id); err != nil {
@@ -86,13 +86,13 @@ func newCancelActiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
}
}
func newCancelAllActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newCancelAllActiveTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
const batchSize = 100
page := 1
qname := mux.Vars(r)["qname"]
for {
tasks, err := inspector.ListActiveTasks(qname, asynq.Page(page), asynq.PageSize(batchSize))
tasks, err := inspector.ListActiveTasks(qname, inspeq.Page(page), inspeq.PageSize(batchSize))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -121,7 +121,7 @@ type batchCancelTasksResponse struct {
ErrorIDs []string `json:"error_ids"`
}
func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newBatchCancelActiveTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
dec := json.NewDecoder(r.Body)
@@ -153,13 +153,13 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
}
}
func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newListPendingTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname := vars["qname"]
pageSize, pageNum := getPageOptions(r)
tasks, err := inspector.ListPendingTasks(
qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -184,13 +184,13 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
}
}
func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newListScheduledTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname := vars["qname"]
pageSize, pageNum := getPageOptions(r)
tasks, err := inspector.ListScheduledTasks(
qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -215,13 +215,13 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu
}
}
func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newListRetryTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname := vars["qname"]
pageSize, pageNum := getPageOptions(r)
tasks, err := inspector.ListRetryTasks(
qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -246,13 +246,13 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newListArchivedTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname := vars["qname"]
pageSize, pageNum := getPageOptions(r)
tasks, err := inspector.ListArchivedTasks(
qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -277,7 +277,7 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFun
}
}
func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newDeleteTaskHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"]
@@ -294,7 +294,7 @@ func newDeleteTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newRunTaskHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"]
@@ -311,7 +311,7 @@ func newRunTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
}
}
func newArchiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newArchiveTaskHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
qname, key := vars["qname"], vars["task_key"]
@@ -333,7 +333,7 @@ type DeleteAllTasksResponse struct {
Deleted int `json:"deleted"`
}
func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newDeleteAllPendingTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
n, err := inspector.DeleteAllPendingTasks(qname)
@@ -349,7 +349,7 @@ func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handle
}
}
func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newDeleteAllScheduledTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
n, err := inspector.DeleteAllScheduledTasks(qname)
@@ -365,7 +365,7 @@ func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.Hand
}
}
func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newDeleteAllRetryTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
n, err := inspector.DeleteAllRetryTasks(qname)
@@ -381,7 +381,7 @@ func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF
}
}
func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newDeleteAllArchivedTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
n, err := inspector.DeleteAllArchivedTasks(qname)
@@ -397,7 +397,7 @@ func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
}
}
func newRunAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newRunAllScheduledTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.RunAllScheduledTasks(qname); err != nil {
@@ -408,7 +408,7 @@ func newRunAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.Handler
}
}
func newRunAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newRunAllRetryTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.RunAllRetryTasks(qname); err != nil {
@@ -419,7 +419,7 @@ func newRunAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
}
}
func newRunAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newRunAllArchivedTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.RunAllArchivedTasks(qname); err != nil {
@@ -430,7 +430,7 @@ func newRunAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF
}
}
func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newArchiveAllPendingTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.ArchiveAllPendingTasks(qname); err != nil {
@@ -441,7 +441,7 @@ func newArchiveAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
}
}
func newArchiveAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newArchiveAllScheduledTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.ArchiveAllScheduledTasks(qname); err != nil {
@@ -452,7 +452,7 @@ func newArchiveAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.Han
}
}
func newArchiveAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newArchiveAllRetryTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
qname := mux.Vars(r)["qname"]
if _, err := inspector.ArchiveAllRetryTasks(qname); err != nil {
@@ -484,7 +484,7 @@ type batchDeleteTasksResponse struct {
// Allow up to 1MB in size.
const maxRequestBodySize = 1000000
func newBatchDeleteTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newBatchDeleteTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
dec := json.NewDecoder(r.Body)
@@ -528,7 +528,7 @@ type batchRunTasksResponse struct {
ErrorKeys []string `json:"error_keys"`
}
func newBatchRunTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newBatchRunTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
dec := json.NewDecoder(r.Body)
@@ -572,7 +572,7 @@ type batchArchiveTasksResponse struct {
ErrorKeys []string `json:"error_keys"`
}
func newBatchArchiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
func newBatchArchiveTasksHandlerFunc(inspector *inspeq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodySize)
dec := json.NewDecoder(r.Body)