mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-19 03:05:53 +08:00
add injectable PayloadStringer
This commit is contained in:
parent
3ec75cad17
commit
a76670956b
@ -1,12 +1,10 @@
|
|||||||
package asynqmon
|
package asynqmon
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"github.com/hibiken/asynq"
|
||||||
"time"
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"github.com/hibiken/asynq"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
@ -15,6 +13,7 @@ import (
|
|||||||
// - conversion function from an external type to an internal type
|
// - conversion function from an external type to an internal type
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
|
|
||||||
|
// PayloadStringer can be used to convert payload bytes to string to show in web ui.
|
||||||
type PayloadStringer interface {
|
type PayloadStringer interface {
|
||||||
String([]byte) string
|
String([]byte) string
|
||||||
}
|
}
|
||||||
@ -25,15 +24,32 @@ func (f PayloadStringerFunc) String(b []byte) string {
|
|||||||
return f(b)
|
return f(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
var payloadStringer PayloadStringer = PayloadStringerFunc(func(payload []byte) string {
|
var defaultPayloadStringer = PayloadStringerFunc(func(payload []byte) string {
|
||||||
if !isPrintable(payload) {
|
if !isPrintable(payload) {
|
||||||
return "non-printable bytes"
|
return "non-printable bytes"
|
||||||
}
|
}
|
||||||
return string(payload)
|
return string(payload)
|
||||||
})
|
})
|
||||||
|
|
||||||
func SetPayloadStringer(stringer PayloadStringer) {
|
// isPrintable reports whether the given data is comprised of all printable runes.
|
||||||
payloadStringer = stringer
|
func isPrintable(data []byte) bool {
|
||||||
|
if !utf8.Valid(data) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
isAllSpace := true
|
||||||
|
for _, r := range string(data) {
|
||||||
|
if !unicode.IsPrint(r) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !unicode.IsSpace(r) {
|
||||||
|
isAllSpace = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return !isAllSpace
|
||||||
|
}
|
||||||
|
|
||||||
|
type transformer struct {
|
||||||
|
ps PayloadStringer
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueueStateSnapshot struct {
|
type QueueStateSnapshot struct {
|
||||||
@ -63,7 +79,7 @@ type QueueStateSnapshot struct {
|
|||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
|
func (t *transformer) toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
|
||||||
return &QueueStateSnapshot{
|
return &QueueStateSnapshot{
|
||||||
Queue: s.Queue,
|
Queue: s.Queue,
|
||||||
MemoryUsage: s.MemoryUsage,
|
MemoryUsage: s.MemoryUsage,
|
||||||
@ -89,7 +105,7 @@ type DailyStats struct {
|
|||||||
Date string `json:"date"`
|
Date string `json:"date"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toDailyStats(s *asynq.DailyStats) *DailyStats {
|
func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats {
|
||||||
return &DailyStats{
|
return &DailyStats{
|
||||||
Queue: s.Queue,
|
Queue: s.Queue,
|
||||||
Processed: s.Processed,
|
Processed: s.Processed,
|
||||||
@ -99,10 +115,10 @@ func toDailyStats(s *asynq.DailyStats) *DailyStats {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
|
func (t *transformer) toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
|
||||||
out := make([]*DailyStats, len(in))
|
out := make([]*DailyStats, len(in))
|
||||||
for i, s := range in {
|
for i, s := range in {
|
||||||
out[i] = toDailyStats(s)
|
out[i] = t.toDailyStats(s)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -145,12 +161,12 @@ func formatTimeInRFC3339(t time.Time) string {
|
|||||||
return t.Format(time.RFC3339)
|
return t.Format(time.RFC3339)
|
||||||
}
|
}
|
||||||
|
|
||||||
func toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
|
func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
|
||||||
return &TaskInfo{
|
return &TaskInfo{
|
||||||
ID: info.ID,
|
ID: info.ID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
Type: info.Type,
|
Type: info.Type,
|
||||||
Payload: payloadStringer.String(info.Payload),
|
Payload: t.ps.String(info.Payload),
|
||||||
State: info.State.String(),
|
State: info.State.String(),
|
||||||
MaxRetry: info.MaxRetry,
|
MaxRetry: info.MaxRetry,
|
||||||
Retried: info.Retried,
|
Retried: info.Retried,
|
||||||
@ -189,23 +205,23 @@ type ActiveTask struct {
|
|||||||
Deadline string `json:"deadline"`
|
Deadline string `json:"deadline"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toActiveTask(t *asynq.TaskInfo) *ActiveTask {
|
func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask {
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: ti.ID,
|
||||||
Type: t.Type,
|
Type: ti.Type,
|
||||||
Payload: payloadStringer.String(t.Payload),
|
Payload: t.ps.String(ti.Payload),
|
||||||
Queue: t.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: ti.Retried,
|
||||||
LastError: t.LastErr,
|
LastError: ti.LastErr,
|
||||||
}
|
}
|
||||||
return &ActiveTask{BaseTask: base}
|
return &ActiveTask{BaseTask: base}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask {
|
func (t *transformer) toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask {
|
||||||
out := make([]*ActiveTask, len(in))
|
out := make([]*ActiveTask, len(in))
|
||||||
for i, t := range in {
|
for i, ti := range in {
|
||||||
out[i] = toActiveTask(t)
|
out[i] = t.toActiveTask(ti)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -215,25 +231,25 @@ type PendingTask struct {
|
|||||||
*BaseTask
|
*BaseTask
|
||||||
}
|
}
|
||||||
|
|
||||||
func toPendingTask(t *asynq.TaskInfo) *PendingTask {
|
func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask {
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: ti.ID,
|
||||||
Type: t.Type,
|
Type: ti.Type,
|
||||||
Payload: payloadStringer.String(t.Payload),
|
Payload: t.ps.String(ti.Payload),
|
||||||
Queue: t.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: ti.Retried,
|
||||||
LastError: t.LastErr,
|
LastError: ti.LastErr,
|
||||||
}
|
}
|
||||||
return &PendingTask{
|
return &PendingTask{
|
||||||
BaseTask: base,
|
BaseTask: base,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toPendingTasks(in []*asynq.TaskInfo) []*PendingTask {
|
func (t *transformer) toPendingTasks(in []*asynq.TaskInfo) []*PendingTask {
|
||||||
out := make([]*PendingTask, len(in))
|
out := make([]*PendingTask, len(in))
|
||||||
for i, t := range in {
|
for i, ti := range in {
|
||||||
out[i] = toPendingTask(t)
|
out[i] = t.toPendingTask(ti)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -243,46 +259,26 @@ type ScheduledTask struct {
|
|||||||
NextProcessAt time.Time `json:"next_process_at"`
|
NextProcessAt time.Time `json:"next_process_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// isPrintable reports whether the given data is comprised of all printable runes.
|
func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask {
|
||||||
func isPrintable(data []byte) bool {
|
|
||||||
if !utf8.Valid(data) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if json.Valid(data) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
isAllSpace := true
|
|
||||||
for _, r := range string(data) {
|
|
||||||
if !unicode.IsPrint(r) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if !unicode.IsSpace(r) {
|
|
||||||
isAllSpace = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return !isAllSpace
|
|
||||||
}
|
|
||||||
|
|
||||||
func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask {
|
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: ti.ID,
|
||||||
Type: t.Type,
|
Type: ti.Type,
|
||||||
Payload: payloadStringer.String(t.Payload),
|
Payload: t.ps.String(ti.Payload),
|
||||||
Queue: t.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: ti.Retried,
|
||||||
LastError: t.LastErr,
|
LastError: ti.LastErr,
|
||||||
}
|
}
|
||||||
return &ScheduledTask{
|
return &ScheduledTask{
|
||||||
BaseTask: base,
|
BaseTask: base,
|
||||||
NextProcessAt: t.NextProcessAt,
|
NextProcessAt: ti.NextProcessAt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask {
|
func (t *transformer) toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask {
|
||||||
out := make([]*ScheduledTask, len(in))
|
out := make([]*ScheduledTask, len(in))
|
||||||
for i, t := range in {
|
for i, ti := range in {
|
||||||
out[i] = toScheduledTask(t)
|
out[i] = t.toScheduledTask(ti)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -292,26 +288,26 @@ type RetryTask struct {
|
|||||||
NextProcessAt time.Time `json:"next_process_at"`
|
NextProcessAt time.Time `json:"next_process_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toRetryTask(t *asynq.TaskInfo) *RetryTask {
|
func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask {
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: ti.ID,
|
||||||
Type: t.Type,
|
Type: ti.Type,
|
||||||
Payload: payloadStringer.String(t.Payload),
|
Payload: t.ps.String(ti.Payload),
|
||||||
Queue: t.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: ti.Retried,
|
||||||
LastError: t.LastErr,
|
LastError: ti.LastErr,
|
||||||
}
|
}
|
||||||
return &RetryTask{
|
return &RetryTask{
|
||||||
BaseTask: base,
|
BaseTask: base,
|
||||||
NextProcessAt: t.NextProcessAt,
|
NextProcessAt: ti.NextProcessAt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toRetryTasks(in []*asynq.TaskInfo) []*RetryTask {
|
func (t *transformer) toRetryTasks(in []*asynq.TaskInfo) []*RetryTask {
|
||||||
out := make([]*RetryTask, len(in))
|
out := make([]*RetryTask, len(in))
|
||||||
for i, t := range in {
|
for i, ti := range in {
|
||||||
out[i] = toRetryTask(t)
|
out[i] = t.toRetryTask(ti)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -321,26 +317,26 @@ type ArchivedTask struct {
|
|||||||
LastFailedAt time.Time `json:"last_failed_at"`
|
LastFailedAt time.Time `json:"last_failed_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toArchivedTask(t *asynq.TaskInfo) *ArchivedTask {
|
func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask {
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: ti.ID,
|
||||||
Type: t.Type,
|
Type: ti.Type,
|
||||||
Payload: payloadStringer.String(t.Payload),
|
Payload: t.ps.String(ti.Payload),
|
||||||
Queue: t.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: ti.Retried,
|
||||||
LastError: t.LastErr,
|
LastError: ti.LastErr,
|
||||||
}
|
}
|
||||||
return &ArchivedTask{
|
return &ArchivedTask{
|
||||||
BaseTask: base,
|
BaseTask: base,
|
||||||
LastFailedAt: t.LastFailedAt,
|
LastFailedAt: ti.LastFailedAt,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask {
|
func (t *transformer) toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask {
|
||||||
out := make([]*ArchivedTask, len(in))
|
out := make([]*ArchivedTask, len(in))
|
||||||
for i, t := range in {
|
for i, ti := range in {
|
||||||
out[i] = toArchivedTask(t)
|
out[i] = t.toArchivedTask(ti)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -356,7 +352,7 @@ type SchedulerEntry struct {
|
|||||||
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
|
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
|
func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
|
||||||
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
|
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
|
||||||
for _, o := range e.Opts {
|
for _, o := range e.Opts {
|
||||||
opts = append(opts, o.String())
|
opts = append(opts, o.String())
|
||||||
@ -369,17 +365,17 @@ func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
|
|||||||
ID: e.ID,
|
ID: e.ID,
|
||||||
Spec: e.Spec,
|
Spec: e.Spec,
|
||||||
TaskType: e.Task.Type(),
|
TaskType: e.Task.Type(),
|
||||||
TaskPayload: payloadStringer.String(e.Task.Payload()),
|
TaskPayload: t.ps.String(e.Task.Payload()),
|
||||||
Opts: opts,
|
Opts: opts,
|
||||||
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
||||||
PrevEnqueueAt: prev,
|
PrevEnqueueAt: prev,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
|
func (t *transformer) toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
|
||||||
out := make([]*SchedulerEntry, len(in))
|
out := make([]*SchedulerEntry, len(in))
|
||||||
for i, e := range in {
|
for i, e := range in {
|
||||||
out[i] = toSchedulerEntry(e)
|
out[i] = t.toSchedulerEntry(e)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -389,17 +385,17 @@ type SchedulerEnqueueEvent struct {
|
|||||||
EnqueuedAt string `json:"enqueued_at"`
|
EnqueuedAt string `json:"enqueued_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
|
func (t *transformer) toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
|
||||||
return &SchedulerEnqueueEvent{
|
return &SchedulerEnqueueEvent{
|
||||||
TaskID: e.TaskID,
|
TaskID: e.TaskID,
|
||||||
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
|
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
|
func (t *transformer) toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
|
||||||
out := make([]*SchedulerEnqueueEvent, len(in))
|
out := make([]*SchedulerEnqueueEvent, len(in))
|
||||||
for i, e := range in {
|
for i, e := range in {
|
||||||
out[i] = toSchedulerEnqueueEvent(e)
|
out[i] = t.toSchedulerEnqueueEvent(e)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -416,7 +412,7 @@ type ServerInfo struct {
|
|||||||
ActiveWorkers []*WorkerInfo `json:"active_workers"`
|
ActiveWorkers []*WorkerInfo `json:"active_workers"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toServerInfo(info *asynq.ServerInfo) *ServerInfo {
|
func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo {
|
||||||
return &ServerInfo{
|
return &ServerInfo{
|
||||||
ID: info.ID,
|
ID: info.ID,
|
||||||
Host: info.Host,
|
Host: info.Host,
|
||||||
@ -426,14 +422,14 @@ func toServerInfo(info *asynq.ServerInfo) *ServerInfo {
|
|||||||
StrictPriority: info.StrictPriority,
|
StrictPriority: info.StrictPriority,
|
||||||
Started: info.Started.Format(time.RFC3339),
|
Started: info.Started.Format(time.RFC3339),
|
||||||
Status: info.Status,
|
Status: info.Status,
|
||||||
ActiveWorkers: toWorkerInfoList(info.ActiveWorkers),
|
ActiveWorkers: t.toWorkerInfoList(info.ActiveWorkers),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo {
|
func (t *transformer) toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo {
|
||||||
out := make([]*ServerInfo, len(in))
|
out := make([]*ServerInfo, len(in))
|
||||||
for i, s := range in {
|
for i, s := range in {
|
||||||
out[i] = toServerInfo(s)
|
out[i] = t.toServerInfo(s)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
@ -446,20 +442,20 @@ type WorkerInfo struct {
|
|||||||
Started string `json:"start_time"`
|
Started string `json:"start_time"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
|
func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
|
||||||
return &WorkerInfo{
|
return &WorkerInfo{
|
||||||
TaskID: info.TaskID,
|
TaskID: info.TaskID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
TaskType: info.TaskType,
|
TaskType: info.TaskType,
|
||||||
TakPayload: payloadStringer.String(info.TaskPayload),
|
TakPayload: t.ps.String(info.TaskPayload),
|
||||||
Started: info.Started.Format(time.RFC3339),
|
Started: info.Started.Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo {
|
func (t *transformer) toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo {
|
||||||
out := make([]*WorkerInfo, len(in))
|
out := make([]*WorkerInfo, len(in))
|
||||||
for i, w := range in {
|
for i, w := range in {
|
||||||
out[i] = toWorkerInfo(w)
|
out[i] = t.toWorkerInfo(w)
|
||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
// - http.Handler(s) for queue related endpoints
|
// - http.Handler(s) for queue related endpoints
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
|
|
||||||
func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListQueuesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
qnames, err := inspector.Queues()
|
qnames, err := inspector.Queues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -28,14 +28,14 @@ func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
snapshots[i] = toQueueStateSnapshot(qinfo)
|
snapshots[i] = t.toQueueStateSnapshot(qinfo)
|
||||||
}
|
}
|
||||||
payload := map[string]interface{}{"queues": snapshots}
|
payload := map[string]interface{}{"queues": snapshots}
|
||||||
json.NewEncoder(w).Encode(payload)
|
json.NewEncoder(w).Encode(payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newGetQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -47,7 +47,7 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
payload["current"] = toQueueStateSnapshot(qinfo)
|
payload["current"] = t.toQueueStateSnapshot(qinfo)
|
||||||
|
|
||||||
// TODO: make this n a variable
|
// TODO: make this n a variable
|
||||||
data, err := inspector.History(qname, 10)
|
data, err := inspector.History(qname, 10)
|
||||||
@ -57,14 +57,14 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
var dailyStats []*DailyStats
|
var dailyStats []*DailyStats
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
dailyStats = append(dailyStats, toDailyStats(s))
|
dailyStats = append(dailyStats, t.toDailyStats(s))
|
||||||
}
|
}
|
||||||
payload["history"] = dailyStats
|
payload["history"] = dailyStats
|
||||||
json.NewEncoder(w).Encode(payload)
|
json.NewEncoder(w).Encode(payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDeleteQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newDeleteQueueHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -112,7 +112,7 @@ type ListQueueStatsResponse struct {
|
|||||||
Stats map[string][]*DailyStats `json:"stats"`
|
Stats map[string][]*DailyStats `json:"stats"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListQueueStatsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
qnames, err := inspector.Queues()
|
qnames, err := inspector.Queues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -127,7 +127,7 @@ func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp.Stats[qname] = toDailyStatsList(stats)
|
resp.Stats[qname] = t.toDailyStatsList(stats)
|
||||||
}
|
}
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
37
router.go
37
router.go
@ -7,14 +7,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RouterOptions struct {
|
type RouterOptions struct {
|
||||||
Inspector *asynq.Inspector
|
RedisClient redis.UniversalClient
|
||||||
Middlewares []mux.MiddlewareFunc
|
Inspector *asynq.Inspector
|
||||||
RedisClient redis.UniversalClient
|
Middlewares []mux.MiddlewareFunc
|
||||||
|
PayloadStringer PayloadStringer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRouter(opts RouterOptions) *mux.Router {
|
func NewRouter(opts RouterOptions) *mux.Router {
|
||||||
router := mux.NewRouter()
|
router := mux.NewRouter()
|
||||||
inspector := opts.Inspector
|
inspector := opts.Inspector
|
||||||
|
t := &transformer{ps: defaultPayloadStringer}
|
||||||
|
if opts.PayloadStringer != nil {
|
||||||
|
t = &transformer{ps: opts.PayloadStringer}
|
||||||
|
}
|
||||||
|
|
||||||
for _, mf := range opts.Middlewares {
|
for _, mf := range opts.Middlewares {
|
||||||
router.Use(mf)
|
router.Use(mf)
|
||||||
@ -22,22 +27,22 @@ func NewRouter(opts RouterOptions) *mux.Router {
|
|||||||
|
|
||||||
api := router.PathPrefix("/api").Subrouter()
|
api := router.PathPrefix("/api").Subrouter()
|
||||||
// Queue endpoints.
|
// Queue endpoints.
|
||||||
api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues", newListQueuesHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}", newGetQueueHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}", newDeleteQueueHandlerFunc(inspector, t)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}:pause", newPauseQueueHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}:resume", newResumeQueueHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
// Queue Historical Stats endpoint.
|
// Queue Historical Stats endpoint.
|
||||||
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queue_stats", newListQueueStatsHandlerFunc(inspector, t)).Methods("GET")
|
||||||
|
|
||||||
// Task endpoints.
|
// Task endpoints.
|
||||||
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/active_tasks", newListActiveTasksHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/active_tasks/{task_id}:cancel", newCancelActiveTaskHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/active_tasks:cancel_all", newCancelAllActiveTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/active_tasks:batch_cancel", newBatchCancelActiveTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/pending_tasks", newListPendingTasksHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/pending_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/pending_tasks:delete_all", newDeleteAllPendingTasksHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/pending_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
@ -45,7 +50,7 @@ func NewRouter(opts RouterOptions) *mux.Router {
|
|||||||
api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/pending_tasks:archive_all", newArchiveAllPendingTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/pending_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks", newListScheduledTasksHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:delete_all", newDeleteAllScheduledTasksHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
@ -56,7 +61,7 @@ func NewRouter(opts RouterOptions) *mux.Router {
|
|||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:archive_all", newArchiveAllScheduledTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/scheduled_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/retry_tasks", newListRetryTasksHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/retry_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/retry_tasks:delete_all", newDeleteAllRetryTasksHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/retry_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
@ -67,7 +72,7 @@ func NewRouter(opts RouterOptions) *mux.Router {
|
|||||||
api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/retry_tasks:archive_all", newArchiveAllRetryTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/retry_tasks:batch_archive", newBatchArchiveTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/archived_tasks", newListArchivedTasksHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/archived_tasks/{task_id}", newDeleteTaskHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE")
|
api.HandleFunc("/queues/{qname}/archived_tasks:delete_all", newDeleteAllArchivedTasksHandlerFunc(inspector)).Methods("DELETE")
|
||||||
api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/archived_tasks:batch_delete", newBatchDeleteTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
@ -75,14 +80,14 @@ func NewRouter(opts RouterOptions) *mux.Router {
|
|||||||
api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/archived_tasks:run_all", newRunAllArchivedTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
api.HandleFunc("/queues/{qname}/archived_tasks:batch_run", newBatchRunTasksHandlerFunc(inspector)).Methods("POST")
|
||||||
|
|
||||||
api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/queues/{qname}/tasks/{task_id}", newGetTaskHandlerFunc(inspector, t)).Methods("GET")
|
||||||
|
|
||||||
// Servers endpoints.
|
// Servers endpoints.
|
||||||
api.HandleFunc("/servers", newListServersHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/servers", newListServersHandlerFunc(inspector, t)).Methods("GET")
|
||||||
|
|
||||||
// Scheduler Entry endpoints.
|
// Scheduler Entry endpoints.
|
||||||
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/scheduler_entries", newListSchedulerEntriesHandlerFunc(inspector, t)).Methods("GET")
|
||||||
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector)).Methods("GET")
|
api.HandleFunc("/scheduler_entries/{entry_id}/enqueue_events", newListSchedulerEnqueueEventsHandlerFunc(inspector, t)).Methods("GET")
|
||||||
|
|
||||||
// Redis info endpoint.
|
// Redis info endpoint.
|
||||||
switch c := opts.RedisClient.(type) {
|
switch c := opts.RedisClient.(type) {
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
// - http.Handler(s) for scheduler entry related endpoints
|
// - http.Handler(s) for scheduler entry related endpoints
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
|
|
||||||
func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
entries, err := inspector.SchedulerEntries()
|
entries, err := inspector.SchedulerEntries()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -25,7 +25,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector) http.Handler
|
|||||||
// avoid nil for the entries field in json output.
|
// avoid nil for the entries field in json output.
|
||||||
payload["entries"] = make([]*SchedulerEntry, 0)
|
payload["entries"] = make([]*SchedulerEntry, 0)
|
||||||
} else {
|
} else {
|
||||||
payload["entries"] = toSchedulerEntries(entries)
|
payload["entries"] = t.toSchedulerEntries(entries)
|
||||||
}
|
}
|
||||||
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
@ -38,7 +38,7 @@ type ListSchedulerEnqueueEventsResponse struct {
|
|||||||
Events []*SchedulerEnqueueEvent `json:"events"`
|
Events []*SchedulerEnqueueEvent `json:"events"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
entryID := mux.Vars(r)["entry_id"]
|
entryID := mux.Vars(r)["entry_id"]
|
||||||
pageSize, pageNum := getPageOptions(r)
|
pageSize, pageNum := getPageOptions(r)
|
||||||
@ -49,7 +49,7 @@ func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.H
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp := ListSchedulerEnqueueEventsResponse{
|
resp := ListSchedulerEnqueueEventsResponse{
|
||||||
Events: toSchedulerEnqueueEvents(events),
|
Events: t.toSchedulerEnqueueEvents(events),
|
||||||
}
|
}
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -16,7 +16,7 @@ type ListServersResponse struct {
|
|||||||
Servers []*ServerInfo `json:"servers"`
|
Servers []*ServerInfo `json:"servers"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListServersHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListServersHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
srvs, err := inspector.Servers()
|
srvs, err := inspector.Servers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -24,7 +24,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp := ListServersResponse{
|
resp := ListServersResponse{
|
||||||
Servers: toServerInfoList(srvs),
|
Servers: t.toServerInfoList(srvs),
|
||||||
}
|
}
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -23,7 +23,7 @@ type ListActiveTasksResponse struct {
|
|||||||
Stats *QueueStateSnapshot `json:"stats"`
|
Stats *QueueStateSnapshot `json:"stats"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -54,7 +54,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
activeTasks := toActiveTasks(tasks)
|
activeTasks := t.toActiveTasks(tasks)
|
||||||
for _, t := range activeTasks {
|
for _, t := range activeTasks {
|
||||||
workerInfo, ok := m[t.ID]
|
workerInfo, ok := m[t.ID]
|
||||||
if ok {
|
if ok {
|
||||||
@ -68,7 +68,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
|
|||||||
|
|
||||||
resp := ListActiveTasksResponse{
|
resp := ListActiveTasksResponse{
|
||||||
Tasks: activeTasks,
|
Tasks: activeTasks,
|
||||||
Stats: toQueueStateSnapshot(qinfo),
|
Stats: t.toQueueStateSnapshot(qinfo),
|
||||||
}
|
}
|
||||||
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
if err := json.NewEncoder(w).Encode(resp); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
@ -155,7 +155,7 @@ func newBatchCancelActiveTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -176,9 +176,9 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
|
|||||||
// avoid nil for the tasks field in json output.
|
// avoid nil for the tasks field in json output.
|
||||||
payload["tasks"] = make([]*PendingTask, 0)
|
payload["tasks"] = make([]*PendingTask, 0)
|
||||||
} else {
|
} else {
|
||||||
payload["tasks"] = toPendingTasks(tasks)
|
payload["tasks"] = t.toPendingTasks(tasks)
|
||||||
}
|
}
|
||||||
payload["stats"] = toQueueStateSnapshot(qinfo)
|
payload["stats"] = t.toQueueStateSnapshot(qinfo)
|
||||||
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -186,7 +186,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -207,9 +207,9 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu
|
|||||||
// avoid nil for the tasks field in json output.
|
// avoid nil for the tasks field in json output.
|
||||||
payload["tasks"] = make([]*ScheduledTask, 0)
|
payload["tasks"] = make([]*ScheduledTask, 0)
|
||||||
} else {
|
} else {
|
||||||
payload["tasks"] = toScheduledTasks(tasks)
|
payload["tasks"] = t.toScheduledTasks(tasks)
|
||||||
}
|
}
|
||||||
payload["stats"] = toQueueStateSnapshot(qinfo)
|
payload["stats"] = t.toQueueStateSnapshot(qinfo)
|
||||||
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -217,7 +217,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -238,9 +238,9 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
// avoid nil for the tasks field in json output.
|
// avoid nil for the tasks field in json output.
|
||||||
payload["tasks"] = make([]*RetryTask, 0)
|
payload["tasks"] = make([]*RetryTask, 0)
|
||||||
} else {
|
} else {
|
||||||
payload["tasks"] = toRetryTasks(tasks)
|
payload["tasks"] = t.toRetryTasks(tasks)
|
||||||
}
|
}
|
||||||
payload["stats"] = toQueueStateSnapshot(qinfo)
|
payload["stats"] = t.toQueueStateSnapshot(qinfo)
|
||||||
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -248,7 +248,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname := vars["qname"]
|
qname := vars["qname"]
|
||||||
@ -269,9 +269,9 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerFun
|
|||||||
// avoid nil for the tasks field in json output.
|
// avoid nil for the tasks field in json output.
|
||||||
payload["tasks"] = make([]*ArchivedTask, 0)
|
payload["tasks"] = make([]*ArchivedTask, 0)
|
||||||
} else {
|
} else {
|
||||||
payload["tasks"] = toArchivedTasks(tasks)
|
payload["tasks"] = t.toArchivedTasks(tasks)
|
||||||
}
|
}
|
||||||
payload["stats"] = toQueueStateSnapshot(qinfo)
|
payload["stats"] = t.toQueueStateSnapshot(qinfo)
|
||||||
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
if err := json.NewEncoder(w).Encode(payload); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
@ -626,7 +626,7 @@ func getPageOptions(r *http.Request) (pageSize, pageNum int) {
|
|||||||
return pageSize, pageNum
|
return pageSize, pageNum
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGetTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
func newGetTaskHandlerFunc(inspector *asynq.Inspector, t *transformer) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
qname, taskid := vars["qname"], vars["task_id"]
|
qname, taskid := vars["qname"], vars["task_id"]
|
||||||
@ -649,7 +649,7 @@ func newGetTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := json.NewEncoder(w).Encode(toTaskInfo(info)); err != nil {
|
if err := json.NewEncoder(w).Encode(t.toTaskInfo(info)); err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user