add injectable PayloadStringer

This commit is contained in:
ajatprabha
2021-09-18 17:56:00 +05:30
parent e85cf39de0
commit 56e3b1a6d2
6 changed files with 153 additions and 152 deletions

View File

@@ -1,12 +1,10 @@
package asynqmon
import (
"encoding/json"
"github.com/hibiken/asynq"
"time"
"unicode"
"unicode/utf8"
"github.com/hibiken/asynq"
)
// ****************************************************************************
@@ -15,6 +13,7 @@ import (
// - conversion function from an external type to an internal type
// ****************************************************************************
// PayloadStringer can be used to convert payload bytes to string to show in web ui.
type PayloadStringer interface {
String([]byte) string
}
@@ -25,15 +24,32 @@ func (f PayloadStringerFunc) String(b []byte) string {
return f(b)
}
var payloadStringer PayloadStringer = PayloadStringerFunc(func(payload []byte) string {
var defaultPayloadStringer = PayloadStringerFunc(func(payload []byte) string {
if !isPrintable(payload) {
return "non-printable bytes"
}
return string(payload)
})
func SetPayloadStringer(stringer PayloadStringer) {
payloadStringer = stringer
// isPrintable reports whether the given data is comprised of all printable runes.
func isPrintable(data []byte) bool {
if !utf8.Valid(data) {
return false
}
isAllSpace := true
for _, r := range string(data) {
if !unicode.IsPrint(r) {
return false
}
if !unicode.IsSpace(r) {
isAllSpace = false
}
}
return !isAllSpace
}
type transformer struct {
ps PayloadStringer
}
type QueueStateSnapshot struct {
@@ -63,7 +79,7 @@ type QueueStateSnapshot struct {
Timestamp time.Time `json:"timestamp"`
}
func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
func (t *transformer) toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
return &QueueStateSnapshot{
Queue: s.Queue,
MemoryUsage: s.MemoryUsage,
@@ -89,7 +105,7 @@ type DailyStats struct {
Date string `json:"date"`
}
func toDailyStats(s *asynq.DailyStats) *DailyStats {
func (t *transformer) toDailyStats(s *asynq.DailyStats) *DailyStats {
return &DailyStats{
Queue: s.Queue,
Processed: s.Processed,
@@ -99,10 +115,10 @@ func toDailyStats(s *asynq.DailyStats) *DailyStats {
}
}
func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
func (t *transformer) toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
out := make([]*DailyStats, len(in))
for i, s := range in {
out[i] = toDailyStats(s)
out[i] = t.toDailyStats(s)
}
return out
}
@@ -145,12 +161,12 @@ func formatTimeInRFC3339(t time.Time) string {
return t.Format(time.RFC3339)
}
func toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
func (t *transformer) toTaskInfo(info *asynq.TaskInfo) *TaskInfo {
return &TaskInfo{
ID: info.ID,
Queue: info.Queue,
Type: info.Type,
Payload: payloadStringer.String(info.Payload),
Payload: t.ps.String(info.Payload),
State: info.State.String(),
MaxRetry: info.MaxRetry,
Retried: info.Retried,
@@ -189,23 +205,23 @@ type ActiveTask struct {
Deadline string `json:"deadline"`
}
func toActiveTask(t *asynq.TaskInfo) *ActiveTask {
func (t *transformer) toActiveTask(ti *asynq.TaskInfo) *ActiveTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: payloadStringer.String(t.Payload),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastErr,
ID: ti.ID,
Type: ti.Type,
Payload: t.ps.String(ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &ActiveTask{BaseTask: base}
}
func toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask {
func (t *transformer) toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask {
out := make([]*ActiveTask, len(in))
for i, t := range in {
out[i] = toActiveTask(t)
for i, ti := range in {
out[i] = t.toActiveTask(ti)
}
return out
}
@@ -215,25 +231,25 @@ type PendingTask struct {
*BaseTask
}
func toPendingTask(t *asynq.TaskInfo) *PendingTask {
func (t *transformer) toPendingTask(ti *asynq.TaskInfo) *PendingTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: payloadStringer.String(t.Payload),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastErr,
ID: ti.ID,
Type: ti.Type,
Payload: t.ps.String(ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &PendingTask{
BaseTask: base,
}
}
func toPendingTasks(in []*asynq.TaskInfo) []*PendingTask {
func (t *transformer) toPendingTasks(in []*asynq.TaskInfo) []*PendingTask {
out := make([]*PendingTask, len(in))
for i, t := range in {
out[i] = toPendingTask(t)
for i, ti := range in {
out[i] = t.toPendingTask(ti)
}
return out
}
@@ -243,46 +259,26 @@ type ScheduledTask struct {
NextProcessAt time.Time `json:"next_process_at"`
}
// isPrintable reports whether the given data is comprised of all printable runes.
func isPrintable(data []byte) bool {
if !utf8.Valid(data) {
return false
}
if json.Valid(data) {
return true
}
isAllSpace := true
for _, r := range string(data) {
if !unicode.IsPrint(r) {
return false
}
if !unicode.IsSpace(r) {
isAllSpace = false
}
}
return !isAllSpace
}
func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask {
func (t *transformer) toScheduledTask(ti *asynq.TaskInfo) *ScheduledTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: payloadStringer.String(t.Payload),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastErr,
ID: ti.ID,
Type: ti.Type,
Payload: t.ps.String(ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &ScheduledTask{
BaseTask: base,
NextProcessAt: t.NextProcessAt,
NextProcessAt: ti.NextProcessAt,
}
}
func toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask {
func (t *transformer) toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask {
out := make([]*ScheduledTask, len(in))
for i, t := range in {
out[i] = toScheduledTask(t)
for i, ti := range in {
out[i] = t.toScheduledTask(ti)
}
return out
}
@@ -292,26 +288,26 @@ type RetryTask struct {
NextProcessAt time.Time `json:"next_process_at"`
}
func toRetryTask(t *asynq.TaskInfo) *RetryTask {
func (t *transformer) toRetryTask(ti *asynq.TaskInfo) *RetryTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: payloadStringer.String(t.Payload),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastErr,
ID: ti.ID,
Type: ti.Type,
Payload: t.ps.String(ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &RetryTask{
BaseTask: base,
NextProcessAt: t.NextProcessAt,
NextProcessAt: ti.NextProcessAt,
}
}
func toRetryTasks(in []*asynq.TaskInfo) []*RetryTask {
func (t *transformer) toRetryTasks(in []*asynq.TaskInfo) []*RetryTask {
out := make([]*RetryTask, len(in))
for i, t := range in {
out[i] = toRetryTask(t)
for i, ti := range in {
out[i] = t.toRetryTask(ti)
}
return out
}
@@ -321,26 +317,26 @@ type ArchivedTask struct {
LastFailedAt time.Time `json:"last_failed_at"`
}
func toArchivedTask(t *asynq.TaskInfo) *ArchivedTask {
func (t *transformer) toArchivedTask(ti *asynq.TaskInfo) *ArchivedTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: payloadStringer.String(t.Payload),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastErr,
ID: ti.ID,
Type: ti.Type,
Payload: t.ps.String(ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
LastError: ti.LastErr,
}
return &ArchivedTask{
BaseTask: base,
LastFailedAt: t.LastFailedAt,
LastFailedAt: ti.LastFailedAt,
}
}
func toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask {
func (t *transformer) toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask {
out := make([]*ArchivedTask, len(in))
for i, t := range in {
out[i] = toArchivedTask(t)
for i, ti := range in {
out[i] = t.toArchivedTask(ti)
}
return out
}
@@ -356,7 +352,7 @@ type SchedulerEntry struct {
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
}
func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
func (t *transformer) toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts {
opts = append(opts, o.String())
@@ -369,17 +365,17 @@ func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
ID: e.ID,
Spec: e.Spec,
TaskType: e.Task.Type(),
TaskPayload: payloadStringer.String(e.Task.Payload()),
TaskPayload: t.ps.String(e.Task.Payload()),
Opts: opts,
NextEnqueueAt: e.Next.Format(time.RFC3339),
PrevEnqueueAt: prev,
}
}
func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
func (t *transformer) toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
out := make([]*SchedulerEntry, len(in))
for i, e := range in {
out[i] = toSchedulerEntry(e)
out[i] = t.toSchedulerEntry(e)
}
return out
}
@@ -389,17 +385,17 @@ type SchedulerEnqueueEvent struct {
EnqueuedAt string `json:"enqueued_at"`
}
func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
func (t *transformer) toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
return &SchedulerEnqueueEvent{
TaskID: e.TaskID,
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
}
}
func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
func (t *transformer) toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
out := make([]*SchedulerEnqueueEvent, len(in))
for i, e := range in {
out[i] = toSchedulerEnqueueEvent(e)
out[i] = t.toSchedulerEnqueueEvent(e)
}
return out
}
@@ -416,7 +412,7 @@ type ServerInfo struct {
ActiveWorkers []*WorkerInfo `json:"active_workers"`
}
func toServerInfo(info *asynq.ServerInfo) *ServerInfo {
func (t *transformer) toServerInfo(info *asynq.ServerInfo) *ServerInfo {
return &ServerInfo{
ID: info.ID,
Host: info.Host,
@@ -426,14 +422,14 @@ func toServerInfo(info *asynq.ServerInfo) *ServerInfo {
StrictPriority: info.StrictPriority,
Started: info.Started.Format(time.RFC3339),
Status: info.Status,
ActiveWorkers: toWorkerInfoList(info.ActiveWorkers),
ActiveWorkers: t.toWorkerInfoList(info.ActiveWorkers),
}
}
func toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo {
func (t *transformer) toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo {
out := make([]*ServerInfo, len(in))
for i, s := range in {
out[i] = toServerInfo(s)
out[i] = t.toServerInfo(s)
}
return out
}
@@ -446,20 +442,20 @@ type WorkerInfo struct {
Started string `json:"start_time"`
}
func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
func (t *transformer) toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
return &WorkerInfo{
TaskID: info.TaskID,
Queue: info.Queue,
TaskType: info.TaskType,
TakPayload: payloadStringer.String(info.TaskPayload),
TakPayload: t.ps.String(info.TaskPayload),
Started: info.Started.Format(time.RFC3339),
}
}
func toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo {
func (t *transformer) toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo {
out := make([]*WorkerInfo, len(in))
for i, w := range in {
out[i] = toWorkerInfo(w)
out[i] = t.toWorkerInfo(w)
}
return out
}