remove type transformer

This commit is contained in:
ajatprabha 2021-10-01 00:19:41 +05:30 committed by Ken Hibino
parent 796d3c089c
commit e635b73e6c
6 changed files with 89 additions and 92 deletions

View File

@ -49,10 +49,6 @@ func isPrintable(data []byte) bool {
return !isAllSpace return !isAllSpace
} }
type transformer struct {
bs BytesStringer
}
type QueueStateSnapshot struct { type QueueStateSnapshot struct {
// Name of the queue. // Name of the queue.
Queue string `json:"queue"` Queue string `json:"queue"`
@ -80,7 +76,7 @@ type QueueStateSnapshot struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
} }
func (t *transformer) toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
return &QueueStateSnapshot{ return &QueueStateSnapshot{
Queue: s.Queue, Queue: s.Queue,
MemoryUsage: s.MemoryUsage, MemoryUsage: s.MemoryUsage,
@ -106,7 +102,7 @@ type DailyStats struct {
Date string `json:"date"` Date string `json:"date"`
} }
func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats { func toDailyStats(s *asynq.DailyStats) *DailyStats {
return &DailyStats{ return &DailyStats{
Queue: s.Queue, Queue: s.Queue,
Processed: s.Processed, Processed: s.Processed,
@ -116,10 +112,10 @@ func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats {
} }
} }
func (t *transformer) toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
out := make([]*DailyStats, len(in)) out := make([]*DailyStats, len(in))
for i, s := range in { for i, s := range in {
out[i] = t.toDailyStats(s) out[i] = toDailyStats(s)
} }
return out return out
} }
@ -162,12 +158,12 @@ func formatTimeInRFC3339(t time.Time) string {
return t.Format(time.RFC3339) return t.Format(time.RFC3339)
} }
func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo { func toTaskInfo(info *asynq.TaskInfo, bs BytesStringer) *TaskInfo {
return &TaskInfo{ return &TaskInfo{
ID: info.ID, ID: info.ID,
Queue: info.Queue, Queue: info.Queue,
Type: info.Type, Type: info.Type,
Payload: t.bs.String(info.Payload), Payload: bs.String(info.Payload),
State: info.State.String(), State: info.State.String(),
MaxRetry: info.MaxRetry, MaxRetry: info.MaxRetry,
Retried: info.Retried, Retried: info.Retried,
@ -206,11 +202,11 @@ type ActiveTask struct {
Deadline string `json:"deadline"` Deadline string `json:"deadline"`
} }
func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask { func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask {
base := &BaseTask{ base := &BaseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: t.bs.String(ti.Payload), Payload: bs.String(ti.Payload),
Queue: ti.Queue, Queue: ti.Queue,
MaxRetry: ti.MaxRetry, MaxRetry: ti.MaxRetry,
Retried: ti.Retried, Retried: ti.Retried,
@ -219,10 +215,10 @@ func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask {
return &ActiveTask{BaseTask: base} return &ActiveTask{BaseTask: base}
} }
func (t *transformer) toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask { func toActiveTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ActiveTask {
out := make([]*ActiveTask, len(in)) out := make([]*ActiveTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = t.toActiveTask(ti) out[i] = toActiveTask(ti, bs)
} }
return out return out
} }
@ -232,11 +228,11 @@ type PendingTask struct {
*BaseTask *BaseTask
} }
func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask { func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask {
base := &BaseTask{ base := &BaseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: t.bs.String(ti.Payload), Payload: bs.String(ti.Payload),
Queue: ti.Queue, Queue: ti.Queue,
MaxRetry: ti.MaxRetry, MaxRetry: ti.MaxRetry,
Retried: ti.Retried, Retried: ti.Retried,
@ -247,10 +243,10 @@ func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask {
} }
} }
func (t *transformer) toPendingTasks(in []*asynq.TaskInfo) []*PendingTask { func toPendingTasks(in []*asynq.TaskInfo, bs BytesStringer) []*PendingTask {
out := make([]*PendingTask, len(in)) out := make([]*PendingTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = t.toPendingTask(ti) out[i] = toPendingTask(ti, bs)
} }
return out return out
} }
@ -260,11 +256,11 @@ type ScheduledTask struct {
NextProcessAt time.Time `json:"next_process_at"` NextProcessAt time.Time `json:"next_process_at"`
} }
func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask { func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask {
base := &BaseTask{ base := &BaseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: t.bs.String(ti.Payload), Payload: bs.String(ti.Payload),
Queue: ti.Queue, Queue: ti.Queue,
MaxRetry: ti.MaxRetry, MaxRetry: ti.MaxRetry,
Retried: ti.Retried, Retried: ti.Retried,
@ -276,10 +272,10 @@ func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask {
} }
} }
func (t *transformer) toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask { func toScheduledTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ScheduledTask {
out := make([]*ScheduledTask, len(in)) out := make([]*ScheduledTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = t.toScheduledTask(ti) out[i] = toScheduledTask(ti, bs)
} }
return out return out
} }
@ -289,11 +285,11 @@ type RetryTask struct {
NextProcessAt time.Time `json:"next_process_at"` NextProcessAt time.Time `json:"next_process_at"`
} }
func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask { func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask {
base := &BaseTask{ base := &BaseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: t.bs.String(ti.Payload), Payload: bs.String(ti.Payload),
Queue: ti.Queue, Queue: ti.Queue,
MaxRetry: ti.MaxRetry, MaxRetry: ti.MaxRetry,
Retried: ti.Retried, Retried: ti.Retried,
@ -305,10 +301,10 @@ func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask {
} }
} }
func (t *transformer) toRetryTasks(in []*asynq.TaskInfo) []*RetryTask { func toRetryTasks(in []*asynq.TaskInfo, bs BytesStringer) []*RetryTask {
out := make([]*RetryTask, len(in)) out := make([]*RetryTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = t.toRetryTask(ti) out[i] = toRetryTask(ti, bs)
} }
return out return out
} }
@ -318,11 +314,11 @@ type ArchivedTask struct {
LastFailedAt time.Time `json:"last_failed_at"` LastFailedAt time.Time `json:"last_failed_at"`
} }
func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask { func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask {
base := &BaseTask{ base := &BaseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: t.bs.String(ti.Payload), Payload: bs.String(ti.Payload),
Queue: ti.Queue, Queue: ti.Queue,
MaxRetry: ti.MaxRetry, MaxRetry: ti.MaxRetry,
Retried: ti.Retried, Retried: ti.Retried,
@ -334,10 +330,10 @@ func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask {
} }
} }
func (t *transformer) toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask { func toArchivedTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ArchivedTask {
out := make([]*ArchivedTask, len(in)) out := make([]*ArchivedTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = t.toArchivedTask(ti) out[i] = toArchivedTask(ti, bs)
} }
return out return out
} }
@ -353,7 +349,7 @@ type SchedulerEntry struct {
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"` PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
} }
func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry { func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts { for _, o := range e.Opts {
opts = append(opts, o.String()) opts = append(opts, o.String())
@ -366,17 +362,17 @@ func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry
ID: e.ID, ID: e.ID,
Spec: e.Spec, Spec: e.Spec,
TaskType: e.Task.Type(), TaskType: e.Task.Type(),
TaskPayload: t.bs.String(e.Task.Payload()), TaskPayload: bs.String(e.Task.Payload()),
Opts: opts, Opts: opts,
NextEnqueueAt: e.Next.Format(time.RFC3339), NextEnqueueAt: e.Next.Format(time.RFC3339),
PrevEnqueueAt: prev, PrevEnqueueAt: prev,
} }
} }
func (t *transformer) toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry { func toSchedulerEntries(in []*asynq.SchedulerEntry, bs BytesStringer) []*SchedulerEntry {
out := make([]*SchedulerEntry, len(in)) out := make([]*SchedulerEntry, len(in))
for i, e := range in { for i, e := range in {
out[i] = t.toSchedulerEntry(e) out[i] = toSchedulerEntry(e, bs)
} }
return out return out
} }
@ -386,17 +382,17 @@ type SchedulerEnqueueEvent struct {
EnqueuedAt string `json:"enqueued_at"` EnqueuedAt string `json:"enqueued_at"`
} }
func (t *transformer) toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
return &SchedulerEnqueueEvent{ return &SchedulerEnqueueEvent{
TaskID: e.TaskID, TaskID: e.TaskID,
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339), EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
} }
} }
func (t *transformer) toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
out := make([]*SchedulerEnqueueEvent, len(in)) out := make([]*SchedulerEnqueueEvent, len(in))
for i, e := range in { for i, e := range in {
out[i] = t.toSchedulerEnqueueEvent(e) out[i] = toSchedulerEnqueueEvent(e)
} }
return out return out
} }
@ -413,7 +409,7 @@ type ServerInfo struct {
ActiveWorkers []*WorkerInfo `json:"active_workers"` ActiveWorkers []*WorkerInfo `json:"active_workers"`
} }
func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo { func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo {
return &ServerInfo{ return &ServerInfo{
ID: info.ID, ID: info.ID,
Host: info.Host, Host: info.Host,
@ -423,14 +419,14 @@ func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo {
StrictPriority: info.StrictPriority, StrictPriority: info.StrictPriority,
Started: info.Started.Format(time.RFC3339), Started: info.Started.Format(time.RFC3339),
Status: info.Status, Status: info.Status,
ActiveWorkers: t.toWorkerInfoList(info.ActiveWorkers), ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, bs),
} }
} }
func (t *transformer) toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo { func toServerInfoList(in []*asynq.ServerInfo, bs BytesStringer) []*ServerInfo {
out := make([]*ServerInfo, len(in)) out := make([]*ServerInfo, len(in))
for i, s := range in { for i, s := range in {
out[i] = t.toServerInfo(s) out[i] = toServerInfo(s, bs)
} }
return out return out
} }
@ -443,20 +439,20 @@ type WorkerInfo struct {
Started string `json:"start_time"` Started string `json:"start_time"`
} }
func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo { func toWorkerInfo(info *asynq.WorkerInfo, bs BytesStringer) *WorkerInfo {
return &WorkerInfo{ return &WorkerInfo{
TaskID: info.TaskID, TaskID: info.TaskID,
Queue: info.Queue, Queue: info.Queue,
TaskType: info.TaskType, TaskType: info.TaskType,
TakPayload: t.bs.String(info.TaskPayload), TakPayload: bs.String(info.TaskPayload),
Started: info.Started.Format(time.RFC3339), Started: info.Started.Format(time.RFC3339),
} }
} }
func (t *transformer) toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo { func toWorkerInfoList(in []*asynq.WorkerInfo, bs BytesStringer) []*WorkerInfo {
out := make([]*WorkerInfo, len(in)) out := make([]*WorkerInfo, len(in))
for i, w := range in { for i, w := range in {
out[i] = t.toWorkerInfo(w) out[i] = toWorkerInfo(w, bs)
} }
return out return out
} }

View File

@ -20,9 +20,10 @@ type HandlerOptions struct {
func NewHandler(opts HandlerOptions) http.Handler { func NewHandler(opts HandlerOptions) http.Handler {
router := mux.NewRouter() router := mux.NewRouter()
inspector := opts.Inspector inspector := opts.Inspector
t := &transformer{bs: defaultBytesStringer}
var bs BytesStringer = defaultBytesStringer
if opts.BytesStringer != nil { if opts.BytesStringer != nil {
t = &transformer{bs: opts.BytesStringer} bs = opts.BytesStringer
} }
for _, mf := range opts.Middlewares { for _, mf := range opts.Middlewares {
@ -31,22 +32,22 @@ func NewHandler(opts HandlerOptions) http.Handler {
api := router.PathPrefix("/api").Subrouter() api := router.PathPrefix("/api").Subrouter()
// Queue endpoints. // Queue endpoints.
api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET")
api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector, t)).Methods("DELETE") api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST")
// Queue Historical Stats endpoint. // Queue Historical Stats endpoint.
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET")
// Task endpoints. // Task endpoints.
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
@ -54,7 +55,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
@ -65,7 +66,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
@ -76,7 +77,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE") api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE")
api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
@ -84,14 +85,14 @@ func NewHandler(opts HandlerOptions) http.Handler {
api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST") api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, bs)).Methods("GET")
// Servers endpoints. // Servers endpoints.
api.HandleFunc("/servers", newListServersHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/servers", newListServersHandlerFunc(inspector, bs)).Methods("GET")
// Scheduler Entry endpoints. // Scheduler Entry endpoints.
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, bs)).Methods("GET")
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector, t)).Methods("GET") api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")
// Redis info endpoint. // Redis info endpoint.
switch c := opts.RedisClient.(type) { switch c := opts.RedisClient.(type) {
@ -101,7 +102,7 @@ func NewHandler(opts HandlerOptions) http.Handler {
api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(c)).Methods("GET") api.HandleFunc("/redis_info", newRedisInfoHandlerFunc(c)).Methods("GET")
} }
api.Handle("/", opts.StaticContentHandler) router.PathPrefix("/").Handler(opts.StaticContentHandler)
return router return router
} }

View File

@ -15,7 +15,7 @@ import (
// - http.Handler(s) for queue related endpoints // - http.Handler(s) for queue related endpoints
// **************************************************************************** // ****************************************************************************
func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListQueuesHandlerFunc(inspector *asynq.Inspector, t BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qnames, err := inspector.Queues() qnames, err := inspector.Queues()
if err != nil { if err != nil {
@ -29,14 +29,14 @@ func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.H
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
snapshots[i] = t.toQueueStateSnapshot(qinfo) snapshots[i] = toQueueStateSnapshot(qinfo)
} }
payload := map[string]interface{}{"queues": snapshots} payload := map[string]interface{}{"queues": snapshots}
json.NewEncoder(w).Encode(payload) json.NewEncoder(w).Encode(payload)
} }
} }
func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -48,7 +48,7 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Han
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
payload["current"] = t.toQueueStateSnapshot(qinfo) payload["current"] = toQueueStateSnapshot(qinfo)
// TODO: make this n a variable // TODO: make this n a variable
data, err := inspector.History(qname, 10) data, err := inspector.History(qname, 10)
@ -58,14 +58,14 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Han
} }
var dailyStats []*DailyStats var dailyStats []*DailyStats
for _, s := range data { for _, s := range data {
dailyStats = append(dailyStats, t.toDailyStats(s)) dailyStats = append(dailyStats, toDailyStats(s))
} }
payload["history"] = dailyStats payload["history"] = dailyStats
json.NewEncoder(w).Encode(payload) json.NewEncoder(w).Encode(payload)
} }
} }
func newDeleteQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newDeleteQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -113,7 +113,7 @@ type ListQueueStatsResponse struct {
Stats map[string][]*DailyStats `json:"stats"` Stats map[string][]*DailyStats `json:"stats"`
} }
func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
qnames, err := inspector.Queues() qnames, err := inspector.Queues()
if err != nil { if err != nil {
@ -128,7 +128,7 @@ func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) ht
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp.Stats[qname] = t.toDailyStatsList(stats) resp.Stats[qname] = toDailyStatsList(stats)
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -14,7 +14,7 @@ import (
// - http.Handler(s) for scheduler entry related endpoints // - http.Handler(s) for scheduler entry related endpoints
// **************************************************************************** // ****************************************************************************
func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
entries, err := inspector.SchedulerEntries() entries, err := inspector.SchedulerEntries()
if err != nil { if err != nil {
@ -26,7 +26,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transform
// avoid nil for the entries field in json output. // avoid nil for the entries field in json output.
payload["entries"] = make([]*SchedulerEntry, 0) payload["entries"] = make([]*SchedulerEntry, 0)
} else { } else {
payload["entries"] = t.toSchedulerEntries(entries) payload["entries"] = toSchedulerEntries(entries, bs)
} }
if err := json.NewEncoder(w).Encode(payload); err != nil { if err := json.NewEncoder(w).Encode(payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -39,7 +39,7 @@ type ListSchedulerEnqueueEventsResponse struct {
Events []*SchedulerEnqueueEvent `json:"events"` Events []*SchedulerEnqueueEvent `json:"events"`
} }
func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
entryID := mux.Vars(r)["entry_id"] entryID := mux.Vars(r)["entry_id"]
pageSize, pageNum := getPageOptions(r) pageSize, pageNum := getPageOptions(r)
@ -50,7 +50,7 @@ func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *tra
return return
} }
resp := ListSchedulerEnqueueEventsResponse{ resp := ListSchedulerEnqueueEventsResponse{
Events: t.toSchedulerEnqueueEvents(events), Events: toSchedulerEnqueueEvents(events),
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -16,7 +16,7 @@ type ListServersResponse struct {
Servers []*ServerInfo `json:"servers"` Servers []*ServerInfo `json:"servers"`
} }
func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListServersHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
srvs, err := inspector.Servers() srvs, err := inspector.Servers()
if err != nil { if err != nil {
@ -24,7 +24,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http.
return return
} }
resp := ListServersResponse{ resp := ListServersResponse{
Servers: t.toServerInfoList(srvs), Servers: toServerInfoList(srvs, bs),
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -24,7 +24,7 @@ type ListActiveTasksResponse struct {
Stats *QueueStateSnapshot `json:"stats"` Stats *QueueStateSnapshot `json:"stats"`
} }
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -55,7 +55,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) h
} }
} }
} }
activeTasks := t.toActiveTasks(tasks) activeTasks := toActiveTasks(tasks, bs)
for _, t := range activeTasks { for _, t := range activeTasks {
workerInfo, ok := m[t.ID] workerInfo, ok := m[t.ID]
if ok { if ok {
@ -69,7 +69,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) h
resp := ListActiveTasksResponse{ resp := ListActiveTasksResponse{
Tasks: activeTasks, Tasks: activeTasks,
Stats: t.toQueueStateSnapshot(qinfo), Stats: toQueueStateSnapshot(qinfo),
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -156,7 +156,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
} }
} }
func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -177,9 +177,9 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer)
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*PendingTask, 0) payload["tasks"] = make([]*PendingTask, 0)
} else { } else {
payload["tasks"] = t.toPendingTasks(tasks) payload["tasks"] = toPendingTasks(tasks, bs)
} }
payload["stats"] = t.toQueueStateSnapshot(qinfo) payload["stats"] = toQueueStateSnapshot(qinfo)
if err := json.NewEncoder(w).Encode(payload); err != nil { if err := json.NewEncoder(w).Encode(payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -187,7 +187,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer)
} }
} }
func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -208,9 +208,9 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*ScheduledTask, 0) payload["tasks"] = make([]*ScheduledTask, 0)
} else { } else {
payload["tasks"] = t.toScheduledTasks(tasks) payload["tasks"] = toScheduledTasks(tasks, bs)
} }
payload["stats"] = t.toQueueStateSnapshot(qinfo) payload["stats"] = toQueueStateSnapshot(qinfo)
if err := json.NewEncoder(w).Encode(payload); err != nil { if err := json.NewEncoder(w).Encode(payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -218,7 +218,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer
} }
} }
func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -239,9 +239,9 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) ht
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*RetryTask, 0) payload["tasks"] = make([]*RetryTask, 0)
} else { } else {
payload["tasks"] = t.toRetryTasks(tasks) payload["tasks"] = toRetryTasks(tasks, bs)
} }
payload["stats"] = t.toQueueStateSnapshot(qinfo) payload["stats"] = toQueueStateSnapshot(qinfo)
if err := json.NewEncoder(w).Encode(payload); err != nil { if err := json.NewEncoder(w).Encode(payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -249,7 +249,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) ht
} }
} }
func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname := vars["qname"] qname := vars["qname"]
@ -270,9 +270,9 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer)
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*ArchivedTask, 0) payload["tasks"] = make([]*ArchivedTask, 0)
} else { } else {
payload["tasks"] = t.toArchivedTasks(tasks) payload["tasks"] = toArchivedTasks(tasks, bs)
} }
payload["stats"] = t.toQueueStateSnapshot(qinfo) payload["stats"] = toQueueStateSnapshot(qinfo)
if err := json.NewEncoder(w).Encode(payload); err != nil { if err := json.NewEncoder(w).Encode(payload); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -627,7 +627,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) {
return pageSize, pageNum return pageSize, pageNum
} }
func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc { func newGetTaskHandlerFunc(inspector *asynq.Inspector, bs BytesStringer) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)
qname, taskid := vars["qname"], vars["task_id"] qname, taskid := vars["qname"], vars["task_id"]
@ -650,7 +650,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.Hand
return return
} }
if err := json.NewEncoder(w).Encode(t.toTaskInfo(info)); err != nil { if err := json.NewEncoder(w).Encode(toTaskInfo(info, bs)); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }