unexport types

This commit is contained in:
ajatprabha 2021-10-04 20:48:00 +05:30 committed by Ken Hibino
parent 4b54ec1548
commit d0b72f135c
10 changed files with 120 additions and 118 deletions

View File

@ -72,7 +72,7 @@ import (
var staticContents embed.FS var staticContents embed.FS
func main() { func main() {
api := asynqmon.NewHTTPHandler(asynqmon.Options{ h := asynqmon.New(asynqmon.Options{
RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"}, RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"},
StaticContentHandler: asynqmon.NewStaticContentHandler( StaticContentHandler: asynqmon.NewStaticContentHandler(
staticContents, staticContents,
@ -80,10 +80,10 @@ func main() {
"index.html", "index.html",
), ),
}) })
defer api.Close() defer h.Close()
srv := &http.Server{ srv := &http.Server{
Handler: api, Handler: h,
Addr: ":8080", Addr: ":8080",
} }

View File

@ -105,7 +105,7 @@ func main() {
} }
} }
api := asynqmon.NewHTTPHandler(asynqmon.Options{ h := asynqmon.New(asynqmon.Options{
RedisConnOpt: redisConnOpt, RedisConnOpt: redisConnOpt,
Middlewares: []asynqmon.MiddlewareFunc{loggingMiddleware}, Middlewares: []asynqmon.MiddlewareFunc{loggingMiddleware},
StaticContentHandler: asynqmon.NewStaticContentHandler( StaticContentHandler: asynqmon.NewStaticContentHandler(
@ -114,14 +114,14 @@ func main() {
"index.html", "index.html",
), ),
}) })
defer api.Close() defer h.Close()
c := cors.New(cors.Options{ c := cors.New(cors.Options{
AllowedMethods: []string{"GET", "POST", "DELETE"}, AllowedMethods: []string{"GET", "POST", "DELETE"},
}) })
srv := &http.Server{ srv := &http.Server{
Handler: c.Handler(api), Handler: c.Handler(h),
Addr: fmt.Sprintf(":%d", flagPort), Addr: fmt.Sprintf(":%d", flagPort),
WriteTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second,

View File

@ -19,8 +19,10 @@ type PayloadFormatter interface {
FormatPayload(taskType string, payload []byte) string FormatPayload(taskType string, payload []byte) string
} }
// PayloadFormatterFunc can be used to create a PayloadFormatter.
type PayloadFormatterFunc func(string, []byte) string type PayloadFormatterFunc func(string, []byte) string
// FormatPayload returns the string representation of the payload of a type.
func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string { func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string {
return f(taskType, payload) return f(taskType, payload)
} }
@ -49,7 +51,7 @@ func isPrintable(data []byte) bool {
return !isAllSpace return !isAllSpace
} }
type QueueStateSnapshot struct { type queueStateSnapshot struct {
// Name of the queue. // Name of the queue.
Queue string `json:"queue"` Queue string `json:"queue"`
// Total number of bytes the queue and its tasks require to be stored in redis. // Total number of bytes the queue and its tasks require to be stored in redis.
@ -76,8 +78,8 @@ type QueueStateSnapshot struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
} }
func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot { func toQueueStateSnapshot(s *asynq.QueueInfo) *queueStateSnapshot {
return &QueueStateSnapshot{ return &queueStateSnapshot{
Queue: s.Queue, Queue: s.Queue,
MemoryUsage: s.MemoryUsage, MemoryUsage: s.MemoryUsage,
Size: s.Size, Size: s.Size,
@ -94,7 +96,7 @@ func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
} }
} }
type DailyStats struct { type dailyStats struct {
Queue string `json:"queue"` Queue string `json:"queue"`
Processed int `json:"processed"` Processed int `json:"processed"`
Succeeded int `json:"succeeded"` Succeeded int `json:"succeeded"`
@ -102,8 +104,8 @@ type DailyStats struct {
Date string `json:"date"` Date string `json:"date"`
} }
func toDailyStats(s *asynq.DailyStats) *DailyStats { func toDailyStats(s *asynq.DailyStats) *dailyStats {
return &DailyStats{ return &dailyStats{
Queue: s.Queue, Queue: s.Queue,
Processed: s.Processed, Processed: s.Processed,
Succeeded: s.Processed - s.Failed, Succeeded: s.Processed - s.Failed,
@ -112,15 +114,15 @@ func toDailyStats(s *asynq.DailyStats) *DailyStats {
} }
} }
func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats { func toDailyStatsList(in []*asynq.DailyStats) []*dailyStats {
out := make([]*DailyStats, len(in)) out := make([]*dailyStats, len(in))
for i, s := range in { for i, s := range in {
out[i] = toDailyStats(s) out[i] = toDailyStats(s)
} }
return out return out
} }
type TaskInfo struct { type taskInfo struct {
// ID is the identifier of the task. // ID is the identifier of the task.
ID string `json:"id"` ID string `json:"id"`
// Queue is the name of the queue in which the task belongs. // Queue is the name of the queue in which the task belongs.
@ -158,8 +160,8 @@ func formatTimeInRFC3339(t time.Time) string {
return t.Format(time.RFC3339) return t.Format(time.RFC3339)
} }
func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *TaskInfo { func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo {
return &TaskInfo{ return &taskInfo{
ID: info.ID, ID: info.ID,
Queue: info.Queue, Queue: info.Queue,
Type: info.Type, Type: info.Type,
@ -175,7 +177,7 @@ func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *TaskInfo {
} }
} }
type BaseTask struct { type baseTask struct {
ID string `json:"id"` ID string `json:"id"`
Type string `json:"type"` Type string `json:"type"`
Payload string `json:"payload"` Payload string `json:"payload"`
@ -185,8 +187,8 @@ type BaseTask struct {
LastError string `json:"error_message"` LastError string `json:"error_message"`
} }
type ActiveTask struct { type activeTask struct {
*BaseTask *baseTask
// Started time indicates when a worker started working on ths task. // Started time indicates when a worker started working on ths task.
// //
@ -202,8 +204,8 @@ type ActiveTask struct {
Deadline string `json:"deadline"` Deadline string `json:"deadline"`
} }
func toActiveTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ActiveTask { func toActiveTask(ti *asynq.TaskInfo, pf PayloadFormatter) *activeTask {
base := &BaseTask{ base := &baseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload), Payload: pf.FormatPayload(ti.Type, ti.Payload),
@ -212,24 +214,24 @@ func toActiveTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ActiveTask {
Retried: ti.Retried, Retried: ti.Retried,
LastError: ti.LastErr, LastError: ti.LastErr,
} }
return &ActiveTask{BaseTask: base} return &activeTask{baseTask: base}
} }
func toActiveTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ActiveTask { func toActiveTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*activeTask {
out := make([]*ActiveTask, len(in)) out := make([]*activeTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = toActiveTask(ti, pf) out[i] = toActiveTask(ti, pf)
} }
return out return out
} }
// TODO: Maybe we don't need state specific type, just use TaskInfo // TODO: Maybe we don't need state specific type, just use taskInfo
type PendingTask struct { type pendingTask struct {
*BaseTask *baseTask
} }
func toPendingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *PendingTask { func toPendingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *pendingTask {
base := &BaseTask{ base := &baseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload), Payload: pf.FormatPayload(ti.Type, ti.Payload),
@ -238,26 +240,26 @@ func toPendingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *PendingTask {
Retried: ti.Retried, Retried: ti.Retried,
LastError: ti.LastErr, LastError: ti.LastErr,
} }
return &PendingTask{ return &pendingTask{
BaseTask: base, baseTask: base,
} }
} }
func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*PendingTask { func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*pendingTask {
out := make([]*PendingTask, len(in)) out := make([]*pendingTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = toPendingTask(ti, pf) out[i] = toPendingTask(ti, pf)
} }
return out return out
} }
type ScheduledTask struct { type scheduledTask struct {
*BaseTask *baseTask
NextProcessAt time.Time `json:"next_process_at"` NextProcessAt time.Time `json:"next_process_at"`
} }
func toScheduledTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ScheduledTask { func toScheduledTask(ti *asynq.TaskInfo, pf PayloadFormatter) *scheduledTask {
base := &BaseTask{ base := &baseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload), Payload: pf.FormatPayload(ti.Type, ti.Payload),
@ -266,27 +268,27 @@ func toScheduledTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ScheduledTask {
Retried: ti.Retried, Retried: ti.Retried,
LastError: ti.LastErr, LastError: ti.LastErr,
} }
return &ScheduledTask{ return &scheduledTask{
BaseTask: base, baseTask: base,
NextProcessAt: ti.NextProcessAt, NextProcessAt: ti.NextProcessAt,
} }
} }
func toScheduledTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ScheduledTask { func toScheduledTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*scheduledTask {
out := make([]*ScheduledTask, len(in)) out := make([]*scheduledTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = toScheduledTask(ti, pf) out[i] = toScheduledTask(ti, pf)
} }
return out return out
} }
type RetryTask struct { type retryTask struct {
*BaseTask *baseTask
NextProcessAt time.Time `json:"next_process_at"` NextProcessAt time.Time `json:"next_process_at"`
} }
func toRetryTask(ti *asynq.TaskInfo, pf PayloadFormatter) *RetryTask { func toRetryTask(ti *asynq.TaskInfo, pf PayloadFormatter) *retryTask {
base := &BaseTask{ base := &baseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload), Payload: pf.FormatPayload(ti.Type, ti.Payload),
@ -295,27 +297,27 @@ func toRetryTask(ti *asynq.TaskInfo, pf PayloadFormatter) *RetryTask {
Retried: ti.Retried, Retried: ti.Retried,
LastError: ti.LastErr, LastError: ti.LastErr,
} }
return &RetryTask{ return &retryTask{
BaseTask: base, baseTask: base,
NextProcessAt: ti.NextProcessAt, NextProcessAt: ti.NextProcessAt,
} }
} }
func toRetryTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*RetryTask { func toRetryTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*retryTask {
out := make([]*RetryTask, len(in)) out := make([]*retryTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = toRetryTask(ti, pf) out[i] = toRetryTask(ti, pf)
} }
return out return out
} }
type ArchivedTask struct { type archivedTask struct {
*BaseTask *baseTask
LastFailedAt time.Time `json:"last_failed_at"` LastFailedAt time.Time `json:"last_failed_at"`
} }
func toArchivedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ArchivedTask { func toArchivedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *archivedTask {
base := &BaseTask{ base := &baseTask{
ID: ti.ID, ID: ti.ID,
Type: ti.Type, Type: ti.Type,
Payload: pf.FormatPayload(ti.Type, ti.Payload), Payload: pf.FormatPayload(ti.Type, ti.Payload),
@ -324,21 +326,21 @@ func toArchivedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ArchivedTask {
Retried: ti.Retried, Retried: ti.Retried,
LastError: ti.LastErr, LastError: ti.LastErr,
} }
return &ArchivedTask{ return &archivedTask{
BaseTask: base, baseTask: base,
LastFailedAt: ti.LastFailedAt, LastFailedAt: ti.LastFailedAt,
} }
} }
func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ArchivedTask { func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*archivedTask {
out := make([]*ArchivedTask, len(in)) out := make([]*archivedTask, len(in))
for i, ti := range in { for i, ti := range in {
out[i] = toArchivedTask(ti, pf) out[i] = toArchivedTask(ti, pf)
} }
return out return out
} }
type SchedulerEntry struct { type schedulerEntry struct {
ID string `json:"id"` ID string `json:"id"`
Spec string `json:"spec"` Spec string `json:"spec"`
TaskType string `json:"task_type"` TaskType string `json:"task_type"`
@ -349,7 +351,7 @@ type SchedulerEntry struct {
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"` PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
} }
func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *SchedulerEntry { func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *schedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts { for _, o := range e.Opts {
opts = append(opts, o.String()) opts = append(opts, o.String())
@ -358,7 +360,7 @@ func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *SchedulerEn
if !e.Prev.IsZero() { if !e.Prev.IsZero() {
prev = e.Prev.Format(time.RFC3339) prev = e.Prev.Format(time.RFC3339)
} }
return &SchedulerEntry{ return &schedulerEntry{
ID: e.ID, ID: e.ID,
Spec: e.Spec, Spec: e.Spec,
TaskType: e.Task.Type(), TaskType: e.Task.Type(),
@ -369,35 +371,35 @@ func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *SchedulerEn
} }
} }
func toSchedulerEntries(in []*asynq.SchedulerEntry, pf PayloadFormatter) []*SchedulerEntry { func toSchedulerEntries(in []*asynq.SchedulerEntry, pf PayloadFormatter) []*schedulerEntry {
out := make([]*SchedulerEntry, len(in)) out := make([]*schedulerEntry, len(in))
for i, e := range in { for i, e := range in {
out[i] = toSchedulerEntry(e, pf) out[i] = toSchedulerEntry(e, pf)
} }
return out return out
} }
type SchedulerEnqueueEvent struct { type schedulerEnqueueEvent struct {
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
EnqueuedAt string `json:"enqueued_at"` EnqueuedAt string `json:"enqueued_at"`
} }
func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent { func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *schedulerEnqueueEvent {
return &SchedulerEnqueueEvent{ return &schedulerEnqueueEvent{
TaskID: e.TaskID, TaskID: e.TaskID,
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339), EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
} }
} }
func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent { func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*schedulerEnqueueEvent {
out := make([]*SchedulerEnqueueEvent, len(in)) out := make([]*schedulerEnqueueEvent, len(in))
for i, e := range in { for i, e := range in {
out[i] = toSchedulerEnqueueEvent(e) out[i] = toSchedulerEnqueueEvent(e)
} }
return out return out
} }
type ServerInfo struct { type serverInfo struct {
ID string `json:"id"` ID string `json:"id"`
Host string `json:"host"` Host string `json:"host"`
PID int `json:"pid"` PID int `json:"pid"`
@ -406,11 +408,11 @@ type ServerInfo struct {
StrictPriority bool `json:"strict_priority_enabled"` StrictPriority bool `json:"strict_priority_enabled"`
Started string `json:"start_time"` Started string `json:"start_time"`
Status string `json:"status"` Status string `json:"status"`
ActiveWorkers []*WorkerInfo `json:"active_workers"` ActiveWorkers []*workerInfo `json:"active_workers"`
} }
func toServerInfo(info *asynq.ServerInfo, pf PayloadFormatter) *ServerInfo { func toServerInfo(info *asynq.ServerInfo, pf PayloadFormatter) *serverInfo {
return &ServerInfo{ return &serverInfo{
ID: info.ID, ID: info.ID,
Host: info.Host, Host: info.Host,
PID: info.PID, PID: info.PID,
@ -423,15 +425,15 @@ func toServerInfo(info *asynq.ServerInfo, pf PayloadFormatter) *ServerInfo {
} }
} }
func toServerInfoList(in []*asynq.ServerInfo, pf PayloadFormatter) []*ServerInfo { func toServerInfoList(in []*asynq.ServerInfo, pf PayloadFormatter) []*serverInfo {
out := make([]*ServerInfo, len(in)) out := make([]*serverInfo, len(in))
for i, s := range in { for i, s := range in {
out[i] = toServerInfo(s, pf) out[i] = toServerInfo(s, pf)
} }
return out return out
} }
type WorkerInfo struct { type workerInfo struct {
TaskID string `json:"task_id"` TaskID string `json:"task_id"`
Queue string `json:"queue"` Queue string `json:"queue"`
TaskType string `json:"task_type"` TaskType string `json:"task_type"`
@ -439,8 +441,8 @@ type WorkerInfo struct {
Started string `json:"start_time"` Started string `json:"start_time"`
} }
func toWorkerInfo(info *asynq.WorkerInfo, pf PayloadFormatter) *WorkerInfo { func toWorkerInfo(info *asynq.WorkerInfo, pf PayloadFormatter) *workerInfo {
return &WorkerInfo{ return &workerInfo{
TaskID: info.TaskID, TaskID: info.TaskID,
Queue: info.Queue, Queue: info.Queue,
TaskType: info.TaskType, TaskType: info.TaskType,
@ -449,8 +451,8 @@ func toWorkerInfo(info *asynq.WorkerInfo, pf PayloadFormatter) *WorkerInfo {
} }
} }
func toWorkerInfoList(in []*asynq.WorkerInfo, pf PayloadFormatter) []*WorkerInfo { func toWorkerInfoList(in []*asynq.WorkerInfo, pf PayloadFormatter) []*workerInfo {
out := make([]*WorkerInfo, len(in)) out := make([]*workerInfo, len(in))
for i, w := range in { for i, w := range in {
out[i] = toWorkerInfo(w, pf) out[i] = toWorkerInfo(w, pf)
} }

View File

@ -12,8 +12,8 @@ import (
//go:embed ui-assets/* //go:embed ui-assets/*
var staticContents embed.FS var staticContents embed.FS
func ExampleNewHTTPHandler() { func ExampleNew() {
api := asynqmon.NewHTTPHandler(asynqmon.Options{ h := asynqmon.New(asynqmon.Options{
RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"}, RedisConnOpt: asynq.RedisClientOpt{Addr: ":6379"},
StaticContentHandler: asynqmon.NewStaticContentHandler( StaticContentHandler: asynqmon.NewStaticContentHandler(
staticContents, staticContents,
@ -21,10 +21,10 @@ func ExampleNewHTTPHandler() {
"index.html", "index.html",
), ),
}) })
defer api.Close() defer h.Close()
srv := &http.Server{ srv := &http.Server{
Handler: api, Handler: h,
Addr: ":8080", Addr: ":8080",
} }

View File

@ -29,7 +29,7 @@ func (a *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.router.ServeHTTP(w, r) a.router.ServeHTTP(w, r)
} }
func NewHTTPHandler(opts Options) *HTTPHandler { func New(opts Options) *HTTPHandler {
rc, ok := opts.RedisConnOpt.MakeRedisClient().(redis.UniversalClient) rc, ok := opts.RedisConnOpt.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
panic(fmt.Sprintf("asnyqmon.HTTPHandler: unsupported RedisConnOpt type %T", opts.RedisConnOpt)) panic(fmt.Sprintf("asnyqmon.HTTPHandler: unsupported RedisConnOpt type %T", opts.RedisConnOpt))

View File

@ -22,7 +22,7 @@ func newListQueuesHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
snapshots := make([]*QueueStateSnapshot, len(qnames)) snapshots := make([]*queueStateSnapshot, len(qnames))
for i, qname := range qnames { for i, qname := range qnames {
qinfo, err := inspector.GetQueueInfo(qname) qinfo, err := inspector.GetQueueInfo(qname)
if err != nil { if err != nil {
@ -56,7 +56,7 @@ func newGetQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
var dailyStats []*DailyStats var dailyStats []*dailyStats
for _, s := range data { for _, s := range data {
dailyStats = append(dailyStats, toDailyStats(s)) dailyStats = append(dailyStats, toDailyStats(s))
} }
@ -109,8 +109,8 @@ func newResumeQueueHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
} }
} }
type ListQueueStatsResponse struct { type listQueueStatsResponse struct {
Stats map[string][]*DailyStats `json:"stats"` Stats map[string][]*dailyStats `json:"stats"`
} }
func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
@ -120,7 +120,7 @@ func newListQueueStatsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := ListQueueStatsResponse{Stats: make(map[string][]*DailyStats)} resp := listQueueStatsResponse{Stats: make(map[string][]*dailyStats)}
const numdays = 90 // Get stats for the last 90 days. const numdays = 90 // Get stats for the last 90 days.
for _, qname := range qnames { for _, qname := range qnames {
stats, err := inspector.History(qname, numdays) stats, err := inspector.History(qname, numdays)

View File

@ -16,7 +16,7 @@ import (
// - http.Handler(s) for redis info related endpoints // - http.Handler(s) for redis info related endpoints
// **************************************************************************** // ****************************************************************************
type RedisInfoResponse struct { type redisInfoResponse struct {
Addr string `json:"address"` Addr string `json:"address"`
Info map[string]string `json:"info"` Info map[string]string `json:"info"`
RawInfo string `json:"raw_info"` RawInfo string `json:"raw_info"`
@ -24,10 +24,10 @@ type RedisInfoResponse struct {
// Following fields are only set when connected to redis cluster. // Following fields are only set when connected to redis cluster.
RawClusterNodes string `json:"raw_cluster_nodes"` RawClusterNodes string `json:"raw_cluster_nodes"`
QueueLocations []*QueueLocationInfo `json:"queue_locations"` QueueLocations []*queueLocationInfo `json:"queue_locations"`
} }
type QueueLocationInfo struct { type queueLocationInfo struct {
Queue string `json:"queue"` // queue name Queue string `json:"queue"` // queue name
KeySlot int64 `json:"keyslot"` // cluster key slot for the queue KeySlot int64 `json:"keyslot"` // cluster key slot for the queue
Nodes []string `json:"nodes"` // list of cluster node addresses Nodes []string `json:"nodes"` // list of cluster node addresses
@ -41,7 +41,7 @@ func newRedisInfoHandlerFunc(client *redis.Client) http.HandlerFunc {
return return
} }
info := parseRedisInfo(res) info := parseRedisInfo(res)
resp := RedisInfoResponse{ resp := redisInfoResponse{
Addr: client.Options().Addr, Addr: client.Options().Addr,
Info: info, Info: info,
RawInfo: res, RawInfo: res,
@ -73,9 +73,9 @@ func newRedisClusterInfoHandlerFunc(client *redis.ClusterClient, inspector *asyn
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
var queueLocations []*QueueLocationInfo var queueLocations []*queueLocationInfo
for _, qname := range queues { for _, qname := range queues {
q := QueueLocationInfo{Queue: qname} q := queueLocationInfo{Queue: qname}
q.KeySlot, err = inspector.ClusterKeySlot(qname) q.KeySlot, err = inspector.ClusterKeySlot(qname)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
@ -92,7 +92,7 @@ func newRedisClusterInfoHandlerFunc(client *redis.ClusterClient, inspector *asyn
queueLocations = append(queueLocations, &q) queueLocations = append(queueLocations, &q)
} }
resp := RedisInfoResponse{ resp := redisInfoResponse{
Addr: strings.Join(client.Options().Addrs, ","), Addr: strings.Join(client.Options().Addrs, ","),
Info: info, Info: info,
RawInfo: rawClusterInfo, RawInfo: rawClusterInfo,

View File

@ -24,7 +24,7 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, pf PayloadFo
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(entries) == 0 { if len(entries) == 0 {
// avoid nil for the entries field in json output. // avoid nil for the entries field in json output.
payload["entries"] = make([]*SchedulerEntry, 0) payload["entries"] = make([]*schedulerEntry, 0)
} else { } else {
payload["entries"] = toSchedulerEntries(entries, pf) payload["entries"] = toSchedulerEntries(entries, pf)
} }
@ -35,8 +35,8 @@ func newListSchedulerEntriesHandlerFunc(inspector *asynq.Inspector, pf PayloadFo
} }
} }
type ListSchedulerEnqueueEventsResponse struct { type listSchedulerEnqueueEventsResponse struct {
Events []*SchedulerEnqueueEvent `json:"events"` Events []*schedulerEnqueueEvent `json:"events"`
} }
func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc { func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
@ -49,7 +49,7 @@ func newListSchedulerEnqueueEventsHandlerFunc(inspector *asynq.Inspector) http.H
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := ListSchedulerEnqueueEventsResponse{ resp := listSchedulerEnqueueEventsResponse{
Events: toSchedulerEnqueueEvents(events), Events: toSchedulerEnqueueEvents(events),
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {

View File

@ -12,8 +12,8 @@ import (
// - http.Handler(s) for server related endpoints // - http.Handler(s) for server related endpoints
// **************************************************************************** // ****************************************************************************
type ListServersResponse struct { type listServersResponse struct {
Servers []*ServerInfo `json:"servers"` Servers []*serverInfo `json:"servers"`
} }
func newListServersHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { func newListServersHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc {
@ -23,7 +23,7 @@ func newListServersHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter)
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := ListServersResponse{ resp := listServersResponse{
Servers: toServerInfoList(srvs, pf), Servers: toServerInfoList(srvs, pf),
} }
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {

View File

@ -19,9 +19,9 @@ import (
// - http.Handler(s) for task related endpoints // - http.Handler(s) for task related endpoints
// **************************************************************************** // ****************************************************************************
type ListActiveTasksResponse struct { type listActiveTasksResponse struct {
Tasks []*ActiveTask `json:"tasks"` Tasks []*activeTask `json:"tasks"`
Stats *QueueStateSnapshot `json:"stats"` Stats *queueStateSnapshot `json:"stats"`
} }
func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc { func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatter) http.HandlerFunc {
@ -46,7 +46,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatt
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
// m maps taskID to WorkerInfo. // m maps taskID to workerInfo.
m := make(map[string]*asynq.WorkerInfo) m := make(map[string]*asynq.WorkerInfo)
for _, srv := range servers { for _, srv := range servers {
for _, w := range srv.ActiveWorkers { for _, w := range srv.ActiveWorkers {
@ -67,7 +67,7 @@ func newListActiveTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatt
} }
} }
resp := ListActiveTasksResponse{ resp := listActiveTasksResponse{
Tasks: activeTasks, Tasks: activeTasks,
Stats: toQueueStateSnapshot(qinfo), Stats: toQueueStateSnapshot(qinfo),
} }
@ -175,7 +175,7 @@ func newListPendingTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormat
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*PendingTask, 0) payload["tasks"] = make([]*pendingTask, 0)
} else { } else {
payload["tasks"] = toPendingTasks(tasks, pf) payload["tasks"] = toPendingTasks(tasks, pf)
} }
@ -206,7 +206,7 @@ func newListScheduledTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForm
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*ScheduledTask, 0) payload["tasks"] = make([]*scheduledTask, 0)
} else { } else {
payload["tasks"] = toScheduledTasks(tasks, pf) payload["tasks"] = toScheduledTasks(tasks, pf)
} }
@ -237,7 +237,7 @@ func newListRetryTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadFormatte
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*RetryTask, 0) payload["tasks"] = make([]*retryTask, 0)
} else { } else {
payload["tasks"] = toRetryTasks(tasks, pf) payload["tasks"] = toRetryTasks(tasks, pf)
} }
@ -268,7 +268,7 @@ func newListArchivedTasksHandlerFunc(inspector *asynq.Inspector, pf PayloadForma
payload := make(map[string]interface{}) payload := make(map[string]interface{})
if len(tasks) == 0 { if len(tasks) == 0 {
// avoid nil for the tasks field in json output. // avoid nil for the tasks field in json output.
payload["tasks"] = make([]*ArchivedTask, 0) payload["tasks"] = make([]*archivedTask, 0)
} else { } else {
payload["tasks"] = toArchivedTasks(tasks, pf) payload["tasks"] = toArchivedTasks(tasks, pf)
} }
@ -331,7 +331,7 @@ func newArchiveTaskHandlerFunc(inspector *asynq.Inspector) http.HandlerFunc {
} }
} }
type DeleteAllTasksResponse struct { type deleteAllTasksResponse struct {
// Number of tasks deleted. // Number of tasks deleted.
Deleted int `json:"deleted"` Deleted int `json:"deleted"`
} }
@ -344,7 +344,7 @@ func newDeleteAllPendingTasksHandlerFunc(inspector *asynq.Inspector) http.Handle
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := DeleteAllTasksResponse{n} resp := deleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -360,7 +360,7 @@ func newDeleteAllScheduledTasksHandlerFunc(inspector *asynq.Inspector) http.Hand
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := DeleteAllTasksResponse{n} resp := deleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -376,7 +376,7 @@ func newDeleteAllRetryTasksHandlerFunc(inspector *asynq.Inspector) http.HandlerF
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := DeleteAllTasksResponse{n} resp := deleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
@ -392,7 +392,7 @@ func newDeleteAllArchivedTasksHandlerFunc(inspector *asynq.Inspector) http.Handl
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
resp := DeleteAllTasksResponse{n} resp := deleteAllTasksResponse{n}
if err := json.NewEncoder(w).Encode(resp); err != nil { if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return