mirror of
https://github.com/hibiken/asynqmon.git
synced 2025-01-19 03:05:53 +08:00
add PayloadStringer interface
This commit is contained in:
parent
c98d65dcdb
commit
0c7eb94fb9
@ -15,6 +15,27 @@ import (
|
|||||||
// - conversion function from an external type to an internal type
|
// - conversion function from an external type to an internal type
|
||||||
// ****************************************************************************
|
// ****************************************************************************
|
||||||
|
|
||||||
|
type PayloadStringer interface {
|
||||||
|
String([]byte) string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PayloadStringerFunc func([]byte) string
|
||||||
|
|
||||||
|
func (f PayloadStringerFunc) String(b []byte) string {
|
||||||
|
return f(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
var payloadStringer PayloadStringer = PayloadStringerFunc(func(payload []byte) string {
|
||||||
|
if !isPrintable(payload) {
|
||||||
|
return "non-printable bytes"
|
||||||
|
}
|
||||||
|
return string(payload)
|
||||||
|
})
|
||||||
|
|
||||||
|
func SetPayloadStringer(stringer PayloadStringer) {
|
||||||
|
payloadStringer = stringer
|
||||||
|
}
|
||||||
|
|
||||||
type QueueStateSnapshot struct {
|
type QueueStateSnapshot struct {
|
||||||
// Name of the queue.
|
// Name of the queue.
|
||||||
Queue string `json:"queue"`
|
Queue string `json:"queue"`
|
||||||
@ -129,7 +150,7 @@ func toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
|
|||||||
ID: info.ID,
|
ID: info.ID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
Type: info.Type,
|
Type: info.Type,
|
||||||
Payload: toPrintablePayload(info.Payload),
|
Payload: payloadStringer.String(info.Payload),
|
||||||
State: info.State.String(),
|
State: info.State.String(),
|
||||||
MaxRetry: info.MaxRetry,
|
MaxRetry: info.MaxRetry,
|
||||||
Retried: info.Retried,
|
Retried: info.Retried,
|
||||||
@ -172,7 +193,7 @@ func toActiveTask(t *asynq.TaskInfo) *ActiveTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: t.ID,
|
||||||
Type: t.Type,
|
Type: t.Type,
|
||||||
Payload: toPrintablePayload(t.Payload),
|
Payload: payloadStringer.String(t.Payload),
|
||||||
Queue: t.Queue,
|
Queue: t.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: t.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: t.Retried,
|
||||||
@ -198,7 +219,7 @@ func toPendingTask(t *asynq.TaskInfo) *PendingTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: t.ID,
|
||||||
Type: t.Type,
|
Type: t.Type,
|
||||||
Payload: toPrintablePayload(t.Payload),
|
Payload: payloadStringer.String(t.Payload),
|
||||||
Queue: t.Queue,
|
Queue: t.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: t.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: t.Retried,
|
||||||
@ -242,18 +263,11 @@ func isPrintable(data []byte) bool {
|
|||||||
return !isAllSpace
|
return !isAllSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
func toPrintablePayload(payload []byte) string {
|
|
||||||
if !isPrintable(payload) {
|
|
||||||
return "non-printable bytes"
|
|
||||||
}
|
|
||||||
return string(payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask {
|
func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask {
|
||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: t.ID,
|
||||||
Type: t.Type,
|
Type: t.Type,
|
||||||
Payload: toPrintablePayload(t.Payload),
|
Payload: payloadStringer.String(t.Payload),
|
||||||
Queue: t.Queue,
|
Queue: t.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: t.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: t.Retried,
|
||||||
@ -282,7 +296,7 @@ func toRetryTask(t *asynq.TaskInfo) *RetryTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: t.ID,
|
||||||
Type: t.Type,
|
Type: t.Type,
|
||||||
Payload: toPrintablePayload(t.Payload),
|
Payload: payloadStringer.String(t.Payload),
|
||||||
Queue: t.Queue,
|
Queue: t.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: t.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: t.Retried,
|
||||||
@ -311,7 +325,7 @@ func toArchivedTask(t *asynq.TaskInfo) *ArchivedTask {
|
|||||||
base := &BaseTask{
|
base := &BaseTask{
|
||||||
ID: t.ID,
|
ID: t.ID,
|
||||||
Type: t.Type,
|
Type: t.Type,
|
||||||
Payload: toPrintablePayload(t.Payload),
|
Payload: payloadStringer.String(t.Payload),
|
||||||
Queue: t.Queue,
|
Queue: t.Queue,
|
||||||
MaxRetry: t.MaxRetry,
|
MaxRetry: t.MaxRetry,
|
||||||
Retried: t.Retried,
|
Retried: t.Retried,
|
||||||
@ -355,7 +369,7 @@ func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
|
|||||||
ID: e.ID,
|
ID: e.ID,
|
||||||
Spec: e.Spec,
|
Spec: e.Spec,
|
||||||
TaskType: e.Task.Type(),
|
TaskType: e.Task.Type(),
|
||||||
TaskPayload: toPrintablePayload(e.Task.Payload()),
|
TaskPayload: payloadStringer.String(e.Task.Payload()),
|
||||||
Opts: opts,
|
Opts: opts,
|
||||||
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
NextEnqueueAt: e.Next.Format(time.RFC3339),
|
||||||
PrevEnqueueAt: prev,
|
PrevEnqueueAt: prev,
|
||||||
@ -437,7 +451,7 @@ func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
|
|||||||
TaskID: info.TaskID,
|
TaskID: info.TaskID,
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
TaskType: info.TaskType,
|
TaskType: info.TaskType,
|
||||||
TakPayload: toPrintablePayload(info.TaskPayload),
|
TakPayload: payloadStringer.String(info.TaskPayload),
|
||||||
Started: info.Started.Format(time.RFC3339),
|
Started: info.Started.Format(time.RFC3339),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user