mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-19 03:05:53 +08:00
rename PayloadStringer => BytesStringer
This commit is contained in:
parent
522dfe2ce5
commit
008c1b1b4a
@ -14,18 +14,18 @@ import (
|
|||||||
// - conversion function from an external type to an internal type
|
// - conversion function from an external type to an internal type
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
|
|
||||||
// PayloadStringer can be used to convert payload bytes to string to show in web ui.
|
// BytesStringer can be used to convert payload bytes to string to show in web ui.
|
||||||
type PayloadStringer interface {
|
type BytesStringer interface {
|
||||||
String([]byte) string
|
String([]byte) string
|
||||||
}
|
}
|
||||||
|
|
||||||
type PayloadStringerFunc func([]byte) string
|
type BytesStringerFunc func([]byte) string
|
||||||
|
|
||||||
func (f PayloadStringerFunc) String(b []byte) string {
|
func (f BytesStringerFunc) String(b []byte) string {
|
||||||
return f(b)
|
return f(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
var defaultPayloadStringer = PayloadStringerFunc(func(payload []byte) string {
|
var defaultBytesStringer = BytesStringerFunc(func(payload []byte) string {
|
||||||
if !isPrintable(payload) {
|
if !isPrintable(payload) {
|
||||||
return "non-printable bytes"
|
return "non-printable bytes"
|
||||||
}
|
}
|
||||||
@ -50,7 +50,7 @@ func isPrintable(data []byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type transformer struct {
|
type transformer struct {
|
||||||
ps PayloadStringer
|
bs BytesStringer
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueueStateSnapshot struct {
|
type QueueStateSnapshot struct {
|
||||||
@ -167,7 +167,7 @@ func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
|
|||||||
ID: info.ID,
|
ID: info.ID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
Type: info.Type,
|
Type: info.Type,
|
||||||
Payload: t.ps.String(info.Payload),
|
Payload: t.bs.String(info.Payload),
|
||||||
State: info.State.String(),
|
State: info.State.String(),
|
||||||
MaxRetry: info.MaxRetry,
|
MaxRetry: info.MaxRetry,
|
||||||
Retried: info.Retried,
|
Retried: info.Retried,
|
||||||
@ -210,7 +210,7 @@ func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: ti.ID,
|
ID: ti.ID,
|
||||||
Type: ti.Type,
|
Type: ti.Type,
|
||||||
Payload: t.ps.String(ti.Payload),
|
Payload: t.bs.String(ti.Payload),
|
||||||
Queue: ti.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: ti.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: ti.Retried,
|
Retried: ti.Retried,
|
||||||
@ -236,7 +236,7 @@ func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: ti.ID,
|
ID: ti.ID,
|
||||||
Type: ti.Type,
|
Type: ti.Type,
|
||||||
Payload: t.ps.String(ti.Payload),
|
Payload: t.bs.String(ti.Payload),
|
||||||
Queue: ti.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: ti.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: ti.Retried,
|
Retried: ti.Retried,
|
||||||
@ -264,7 +264,7 @@ func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: ti.ID,
|
ID: ti.ID,
|
||||||
Type: ti.Type,
|
Type: ti.Type,
|
||||||
Payload: t.ps.String(ti.Payload),
|
Payload: t.bs.String(ti.Payload),
|
||||||
Queue: ti.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: ti.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: ti.Retried,
|
Retried: ti.Retried,
|
||||||
@ -293,7 +293,7 @@ func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: ti.ID,
|
ID: ti.ID,
|
||||||
Type: ti.Type,
|
Type: ti.Type,
|
||||||
Payload: t.ps.String(ti.Payload),
|
Payload: t.bs.String(ti.Payload),
|
||||||
Queue: ti.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: ti.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: ti.Retried,
|
Retried: ti.Retried,
|
||||||
@ -322,7 +322,7 @@ func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: ti.ID,
|
ID: ti.ID,
|
||||||
Type: ti.Type,
|
Type: ti.Type,
|
||||||
Payload: t.ps.String(ti.Payload),
|
Payload: t.bs.String(ti.Payload),
|
||||||
Queue: ti.Queue,
|
Queue: ti.Queue,
|
||||||
MaxRetry: ti.MaxRetry,
|
MaxRetry: ti.MaxRetry,
|
||||||
Retried: ti.Retried,
|
Retried: ti.Retried,
|
||||||
@ -366,7 +366,7 @@ func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry
|
|||||||
ID: e.ID,
|
ID: e.ID,
|
||||||
Spec: e.Spec,
|
Spec: e.Spec,
|
||||||
TaskType: e.Task.Type(),
|
TaskType: e.Task.Type(),
|
||||||
TaskPayload: t.ps.String(e.Task.Payload()),
|
TaskPayload: t.bs.String(e.Task.Payload()),
|
||||||
Opts: opts,
|
Opts: opts,
|
||||||
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
||||||
PrevEnqueueAt: prev,
|
PrevEnqueueAt: prev,
|
||||||
@ -448,7 +448,7 @@ func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
|
|||||||
TaskID: info.TaskID,
|
TaskID: info.TaskID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
TaskType: info.TaskType,
|
TaskType: info.TaskType,
|
||||||
TakPayload: t.ps.String(info.TaskPayload),
|
TakPayload: t.bs.String(info.TaskPayload),
|
||||||
Started: info.Started.Format(time.RFC3339),
|
Started: info.Started.Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
14
router.go
14
router.go
@ -8,18 +8,18 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type RouterOptions struct {
|
type RouterOptions struct {
|
||||||
RedisClient redis.UniversalClient
|
RedisClient redis.UniversalClient
|
||||||
Inspector *asynq.Inspector
|
Inspector *asynq.Inspector
|
||||||
Middlewares []mux.MiddlewareFunc
|
Middlewares []mux.MiddlewareFunc
|
||||||
PayloadStringer PayloadStringer
|
BytesStringer BytesStringer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRouter(opts RouterOptions) *mux.Router {
|
func NewRouter(opts RouterOptions) *mux.Router {
|
||||||
router := mux.NewRouter()
|
router := mux.NewRouter()
|
||||||
inspector := opts.Inspector
|
inspector := opts.Inspector
|
||||||
t := &transformer{ps: defaultPayloadStringer}
|
t := &transformer{bs: defaultBytesStringer}
|
||||||
if opts.PayloadStringer != nil {
|
if opts.BytesStringer != nil {
|
||||||
t = &transformer{ps: opts.PayloadStringer}
|
t = &transformer{bs: opts.BytesStringer}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, mf := range opts.Middlewares {
|
for _, mf := range opts.Middlewares {
|
||||||
|
Loading…
Reference in New Issue
Block a user