update BytesStringer => PayloadFormatter to pass taskType

This commit is contained in:
ajatprabha
2021-10-02 12:57:41 +05:30
committed by Ken Hibino
parent e635b73e6c
commit e569ad9186
6 changed files with 80 additions and 80 deletions

View File

@@ -14,18 +14,18 @@ import (
// - conversion function from an external type to an internal type
// ****************************************************************************
// BytesStringer can be used to convert payload bytes to string to show in web ui.
type BytesStringer interface {
String([]byte) string
// PayloadFormatter can be used to convert payload bytes to string to show in web ui.
type PayloadFormatter interface {
FormatPayload(taskType string, payload []byte) string
}
type BytesStringerFunc func([]byte) string
type PayloadFormatterFunc func(string, []byte) string
func (f BytesStringerFunc) String(b []byte) string {
return f(b)
func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string {
return f(taskType, payload)
}
var defaultBytesStringer = BytesStringerFunc(func(payload []byte) string {
var defaultPayloadFormatter = PayloadFormatterFunc(func(_ string, payload []byte) string {
if !isPrintable(payload) {
return "non-printable bytes"
}
@@ -158,12 +158,12 @@ func formatTimeInRFC3339(t time.Time) string {
return t.Format(time.RFC3339)
}
func toTaskInfo(info *asynq.TaskInfo, bs BytesStringer) *TaskInfo {
func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *TaskInfo {
return &TaskInfo{
ID: info.ID,
Queue: info.Queue,
Type: info.Type,
Payload: bs.String(info.Payload),
Payload: pf.FormatPayload(info.Type, info.Payload),
State: info.State.String(),
MaxRetry: info.MaxRetry,
Retried: info.Retried,
@@ -202,11 +202,11 @@ type ActiveTask struct {
Deadline string `json:"deadline"`
}
func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask {
func toActiveTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ActiveTask {
base := &BaseTask{
ID: ti.ID,
Type: ti.Type,
Payload: bs.String(ti.Payload),
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
@@ -215,10 +215,10 @@ func toActiveTask(ti *asynq.TaskInfo, bs BytesStringer) *ActiveTask {
return &ActiveTask{BaseTask: base}
}
func toActiveTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ActiveTask {
func toActiveTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ActiveTask {
out := make([]*ActiveTask, len(in))
for i, ti := range in {
out[i] = toActiveTask(ti, bs)
out[i] = toActiveTask(ti, pf)
}
return out
}
@@ -228,11 +228,11 @@ type PendingTask struct {
*BaseTask
}
func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask {
func toPendingTask(ti *asynq.TaskInfo, pf PayloadFormatter) *PendingTask {
base := &BaseTask{
ID: ti.ID,
Type: ti.Type,
Payload: bs.String(ti.Payload),
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
@@ -243,10 +243,10 @@ func toPendingTask(ti *asynq.TaskInfo, bs BytesStringer) *PendingTask {
}
}
func toPendingTasks(in []*asynq.TaskInfo, bs BytesStringer) []*PendingTask {
func toPendingTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*PendingTask {
out := make([]*PendingTask, len(in))
for i, ti := range in {
out[i] = toPendingTask(ti, bs)
out[i] = toPendingTask(ti, pf)
}
return out
}
@@ -256,11 +256,11 @@ type ScheduledTask struct {
NextProcessAt time.Time `json:"next_process_at"`
}
func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask {
func toScheduledTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ScheduledTask {
base := &BaseTask{
ID: ti.ID,
Type: ti.Type,
Payload: bs.String(ti.Payload),
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
@@ -272,10 +272,10 @@ func toScheduledTask(ti *asynq.TaskInfo, bs BytesStringer) *ScheduledTask {
}
}
func toScheduledTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ScheduledTask {
func toScheduledTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ScheduledTask {
out := make([]*ScheduledTask, len(in))
for i, ti := range in {
out[i] = toScheduledTask(ti, bs)
out[i] = toScheduledTask(ti, pf)
}
return out
}
@@ -285,11 +285,11 @@ type RetryTask struct {
NextProcessAt time.Time `json:"next_process_at"`
}
func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask {
func toRetryTask(ti *asynq.TaskInfo, pf PayloadFormatter) *RetryTask {
base := &BaseTask{
ID: ti.ID,
Type: ti.Type,
Payload: bs.String(ti.Payload),
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
@@ -301,10 +301,10 @@ func toRetryTask(ti *asynq.TaskInfo, bs BytesStringer) *RetryTask {
}
}
func toRetryTasks(in []*asynq.TaskInfo, bs BytesStringer) []*RetryTask {
func toRetryTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*RetryTask {
out := make([]*RetryTask, len(in))
for i, ti := range in {
out[i] = toRetryTask(ti, bs)
out[i] = toRetryTask(ti, pf)
}
return out
}
@@ -314,11 +314,11 @@ type ArchivedTask struct {
LastFailedAt time.Time `json:"last_failed_at"`
}
func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask {
func toArchivedTask(ti *asynq.TaskInfo, pf PayloadFormatter) *ArchivedTask {
base := &BaseTask{
ID: ti.ID,
Type: ti.Type,
Payload: bs.String(ti.Payload),
Payload: pf.FormatPayload(ti.Type, ti.Payload),
Queue: ti.Queue,
MaxRetry: ti.MaxRetry,
Retried: ti.Retried,
@@ -330,10 +330,10 @@ func toArchivedTask(ti *asynq.TaskInfo, bs BytesStringer) *ArchivedTask {
}
}
func toArchivedTasks(in []*asynq.TaskInfo, bs BytesStringer) []*ArchivedTask {
func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*ArchivedTask {
out := make([]*ArchivedTask, len(in))
for i, ti := range in {
out[i] = toArchivedTask(ti, bs)
out[i] = toArchivedTask(ti, pf)
}
return out
}
@@ -349,7 +349,7 @@ type SchedulerEntry struct {
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
}
func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry {
func toSchedulerEntry(e *asynq.SchedulerEntry, pf PayloadFormatter) *SchedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts {
opts = append(opts, o.String())
@@ -362,17 +362,17 @@ func toSchedulerEntry(e *asynq.SchedulerEntry, bs BytesStringer) *SchedulerEntry
ID: e.ID,
Spec: e.Spec,
TaskType: e.Task.Type(),
TaskPayload: bs.String(e.Task.Payload()),
TaskPayload: pf.FormatPayload(e.Task.Type(), e.Task.Payload()),
Opts: opts,
NextEnqueueAt: e.Next.Format(time.RFC3339),
PrevEnqueueAt: prev,
}
}
func toSchedulerEntries(in []*asynq.SchedulerEntry, bs BytesStringer) []*SchedulerEntry {
func toSchedulerEntries(in []*asynq.SchedulerEntry, pf PayloadFormatter) []*SchedulerEntry {
out := make([]*SchedulerEntry, len(in))
for i, e := range in {
out[i] = toSchedulerEntry(e, bs)
out[i] = toSchedulerEntry(e, pf)
}
return out
}
@@ -409,7 +409,7 @@ type ServerInfo struct {
ActiveWorkers []*WorkerInfo `json:"active_workers"`
}
func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo {
func toServerInfo(info *asynq.ServerInfo, pf PayloadFormatter) *ServerInfo {
return &ServerInfo{
ID: info.ID,
Host: info.Host,
@@ -419,40 +419,40 @@ func toServerInfo(info *asynq.ServerInfo, bs BytesStringer) *ServerInfo {
StrictPriority: info.StrictPriority,
Started: info.Started.Format(time.RFC3339),
Status: info.Status,
ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, bs),
ActiveWorkers: toWorkerInfoList(info.ActiveWorkers, pf),
}
}
func toServerInfoList(in []*asynq.ServerInfo, bs BytesStringer) []*ServerInfo {
func toServerInfoList(in []*asynq.ServerInfo, pf PayloadFormatter) []*ServerInfo {
out := make([]*ServerInfo, len(in))
for i, s := range in {
out[i] = toServerInfo(s, bs)
out[i] = toServerInfo(s, pf)
}
return out
}
type WorkerInfo struct {
TaskID string `json:"task_id"`
Queue string `json:"queue"`
TaskType string `json:"task_type"`
TakPayload string `json:"task_payload"`
Started string `json:"start_time"`
TaskID string `json:"task_id"`
Queue string `json:"queue"`
TaskType string `json:"task_type"`
TaskPayload string `json:"task_payload"`
Started string `json:"start_time"`
}
func toWorkerInfo(info *asynq.WorkerInfo, bs BytesStringer) *WorkerInfo {
func toWorkerInfo(info *asynq.WorkerInfo, pf PayloadFormatter) *WorkerInfo {
return &WorkerInfo{
TaskID: info.TaskID,
Queue: info.Queue,
TaskType: info.TaskType,
TakPayload: bs.String(info.TaskPayload),
Started: info.Started.Format(time.RFC3339),
TaskID: info.TaskID,
Queue: info.Queue,
TaskType: info.TaskType,
TaskPayload: pf.FormatPayload(info.TaskType, info.TaskPayload),
Started: info.Started.Format(time.RFC3339),
}
}
func toWorkerInfoList(in []*asynq.WorkerInfo, bs BytesStringer) []*WorkerInfo {
func toWorkerInfoList(in []*asynq.WorkerInfo, pf PayloadFormatter) []*WorkerInfo {
out := make([]*WorkerInfo, len(in))
for i, w := range in {
out[i] = toWorkerInfo(w, bs)
out[i] = toWorkerInfo(w, pf)
}
return out
}