asynqmon/conversion_helpers.go

369 lines
9.2 KiB
Go
Raw Normal View History

2020-11-24 22:54:00 +08:00
package main
import (
"time"
"github.com/hibiken/asynq"
2021-01-29 11:47:58 +08:00
"github.com/hibiken/asynq/inspeq"
2020-11-24 22:54:00 +08:00
)
2020-12-02 23:19:06 +08:00
// ****************************************************************************
// This file defines:
// - internal types with JSON struct tags
// - conversion function from an external type to an internal type
// ****************************************************************************
2020-11-24 22:54:00 +08:00
type QueueStateSnapshot struct {
// Name of the queue.
Queue string `json:"queue"`
// Total number of bytes the queue and its tasks require to be stored in redis.
MemoryUsage int64 `json:"memory_usage_bytes"`
2020-11-24 22:54:00 +08:00
// Total number of tasks in the queue.
Size int `json:"size"`
// Number of tasks in each state.
Active int `json:"active"`
Pending int `json:"pending"`
Scheduled int `json:"scheduled"`
Retry int `json:"retry"`
Archived int `json:"archived"`
2020-11-24 22:54:00 +08:00
// Total number of tasks processed during the given date.
// The number includes both succeeded and failed tasks.
Processed int `json:"processed"`
// Breakdown of processed tasks.
Succeeded int `json:"succeeded"`
Failed int `json:"failed"`
// Paused indicates whether the queue is paused.
// If true, tasks in the queue will not be processed.
Paused bool `json:"paused"`
// Time when this snapshot was taken.
Timestamp time.Time `json:"timestamp"`
}
2021-01-29 11:47:58 +08:00
func toQueueStateSnapshot(s *inspeq.QueueStats) *QueueStateSnapshot {
2020-11-24 22:54:00 +08:00
return &QueueStateSnapshot{
Queue: s.Queue,
MemoryUsage: s.MemoryUsage,
Size: s.Size,
Active: s.Active,
Pending: s.Pending,
Scheduled: s.Scheduled,
Retry: s.Retry,
Archived: s.Archived,
Processed: s.Processed,
Succeeded: s.Processed - s.Failed,
Failed: s.Failed,
Paused: s.Paused,
Timestamp: s.Timestamp,
2020-11-24 22:54:00 +08:00
}
}
type DailyStats struct {
2020-12-28 08:45:28 +08:00
Queue string `json:"queue"`
Processed int `json:"processed"`
Succeeded int `json:"succeeded"`
Failed int `json:"failed"`
Date string `json:"date"`
2020-11-24 22:54:00 +08:00
}
2021-01-29 11:47:58 +08:00
func toDailyStats(s *inspeq.DailyStats) *DailyStats {
2020-11-24 22:54:00 +08:00
return &DailyStats{
Queue: s.Queue,
Processed: s.Processed,
Succeeded: s.Processed - s.Failed,
Failed: s.Failed,
2020-12-28 08:45:28 +08:00
Date: s.Date.Format("2006-01-02"),
2020-11-24 22:54:00 +08:00
}
}
2021-01-29 11:47:58 +08:00
func toDailyStatsList(in []*inspeq.DailyStats) []*DailyStats {
out := make([]*DailyStats, len(in))
for i, s := range in {
out[i] = toDailyStats(s)
}
return out
}
2020-11-24 22:54:00 +08:00
type BaseTask struct {
ID string `json:"id"`
Type string `json:"type"`
Payload asynq.Payload `json:"payload"`
Queue string `json:"queue"`
MaxRetry int `json:"max_retry"`
Retried int `json:"retried"`
LastError string `json:"error_message"`
2020-11-24 22:54:00 +08:00
}
type ActiveTask struct {
*BaseTask
2021-01-24 04:06:50 +08:00
// Started time indicates when a worker started working on ths task.
//
// Value is either time formatted in RFC3339 format, or "-" which indicates
// a worker started working on the task only a few moments ago, and started time
// data is not available.
Started string `json:"start_time"`
2021-01-28 08:16:38 +08:00
// Deadline indicates the time by which the worker needs to finish its task.
//
// Value is either time formatted in RFC3339 format, or "-" which indicates that
// the data is not available yet.
Deadline string `json:"deadline"`
2020-11-24 22:54:00 +08:00
}
2021-01-29 11:47:58 +08:00
func toActiveTask(t *inspeq.ActiveTask) *ActiveTask {
2020-11-24 22:54:00 +08:00
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
2020-11-24 22:54:00 +08:00
}
2021-01-24 04:06:50 +08:00
return &ActiveTask{BaseTask: base}
2020-11-24 22:54:00 +08:00
}
2021-01-29 11:47:58 +08:00
func toActiveTasks(in []*inspeq.ActiveTask) []*ActiveTask {
2020-11-24 22:54:00 +08:00
out := make([]*ActiveTask, len(in))
for i, t := range in {
out[i] = toActiveTask(t)
}
return out
}
type PendingTask struct {
*BaseTask
2021-01-21 22:47:56 +08:00
Key string `json:"key"`
2020-11-24 22:54:00 +08:00
}
2021-01-29 11:47:58 +08:00
func toPendingTask(t *inspeq.PendingTask) *PendingTask {
2020-11-24 22:54:00 +08:00
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
2020-11-24 22:54:00 +08:00
}
2021-01-21 22:47:56 +08:00
return &PendingTask{
BaseTask: base,
Key: t.Key(),
}
2020-11-24 22:54:00 +08:00
}
2021-01-29 11:47:58 +08:00
func toPendingTasks(in []*inspeq.PendingTask) []*PendingTask {
2020-11-24 22:54:00 +08:00
out := make([]*PendingTask, len(in))
for i, t := range in {
out[i] = toPendingTask(t)
}
return out
}
type ScheduledTask struct {
*BaseTask
Key string `json:"key"`
2020-11-24 22:54:00 +08:00
NextProcessAt time.Time `json:"next_process_at"`
}
2021-01-29 11:47:58 +08:00
func toScheduledTask(t *inspeq.ScheduledTask) *ScheduledTask {
2020-11-24 22:54:00 +08:00
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
2020-11-24 22:54:00 +08:00
}
return &ScheduledTask{
BaseTask: base,
Key: t.Key(),
2020-11-24 22:54:00 +08:00
NextProcessAt: t.NextProcessAt,
}
}
2021-01-29 11:47:58 +08:00
func toScheduledTasks(in []*inspeq.ScheduledTask) []*ScheduledTask {
2020-11-24 22:54:00 +08:00
out := make([]*ScheduledTask, len(in))
for i, t := range in {
out[i] = toScheduledTask(t)
}
return out
}
type RetryTask struct {
*BaseTask
Key string `json:"key"`
2020-11-24 22:54:00 +08:00
NextProcessAt time.Time `json:"next_process_at"`
}
2021-01-29 11:47:58 +08:00
func toRetryTask(t *inspeq.RetryTask) *RetryTask {
2020-11-24 22:54:00 +08:00
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
2020-11-24 22:54:00 +08:00
}
return &RetryTask{
BaseTask: base,
Key: t.Key(),
2020-11-24 22:54:00 +08:00
NextProcessAt: t.NextProcessAt,
}
}
2021-01-29 11:47:58 +08:00
func toRetryTasks(in []*inspeq.RetryTask) []*RetryTask {
2020-11-24 22:54:00 +08:00
out := make([]*RetryTask, len(in))
for i, t := range in {
out[i] = toRetryTask(t)
}
return out
}
type ArchivedTask struct {
2020-11-24 22:54:00 +08:00
*BaseTask
Key string `json:"key"`
2020-11-24 22:54:00 +08:00
LastFailedAt time.Time `json:"last_failed_at"`
}
2021-01-29 11:47:58 +08:00
func toArchivedTask(t *inspeq.ArchivedTask) *ArchivedTask {
2020-11-24 22:54:00 +08:00
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
2020-11-24 22:54:00 +08:00
}
return &ArchivedTask{
2020-11-24 22:54:00 +08:00
BaseTask: base,
Key: t.Key(),
2020-11-24 22:54:00 +08:00
LastFailedAt: t.LastFailedAt,
}
}
2021-01-29 11:47:58 +08:00
func toArchivedTasks(in []*inspeq.ArchivedTask) []*ArchivedTask {
out := make([]*ArchivedTask, len(in))
2020-11-24 22:54:00 +08:00
for i, t := range in {
out[i] = toArchivedTask(t)
2020-11-24 22:54:00 +08:00
}
return out
}
2020-12-02 23:19:06 +08:00
type SchedulerEntry struct {
ID string `json:"id"`
Spec string `json:"spec"`
TaskType string `json:"task_type"`
TaskPayload asynq.Payload `json:"task_payload"`
Opts []string `json:"options"`
NextEnqueueAt string `json:"next_enqueue_at"`
// This field is omitted if there were no previous enqueue events.
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
2020-12-02 23:19:06 +08:00
}
2021-01-29 11:47:58 +08:00
func toSchedulerEntry(e *inspeq.SchedulerEntry) *SchedulerEntry {
2020-12-02 23:19:06 +08:00
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts {
opts = append(opts, o.String())
}
prev := ""
if !e.Prev.IsZero() {
prev = e.Prev.Format(time.RFC3339)
}
2020-12-02 23:19:06 +08:00
return &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
TaskType: e.Task.Type,
TaskPayload: e.Task.Payload,
Opts: opts,
NextEnqueueAt: e.Next.Format(time.RFC3339),
PrevEnqueueAt: prev,
2020-12-02 23:19:06 +08:00
}
}
2021-01-29 11:47:58 +08:00
func toSchedulerEntries(in []*inspeq.SchedulerEntry) []*SchedulerEntry {
2020-12-02 23:19:06 +08:00
out := make([]*SchedulerEntry, len(in))
for i, e := range in {
out[i] = toSchedulerEntry(e)
}
return out
}
type SchedulerEnqueueEvent struct {
TaskID string `json:"task_id"`
EnqueuedAt string `json:"enqueued_at"`
}
2021-01-29 11:47:58 +08:00
func toSchedulerEnqueueEvent(e *inspeq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
return &SchedulerEnqueueEvent{
TaskID: e.TaskID,
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
}
}
2021-01-29 11:47:58 +08:00
func toSchedulerEnqueueEvents(in []*inspeq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
out := make([]*SchedulerEnqueueEvent, len(in))
for i, e := range in {
out[i] = toSchedulerEnqueueEvent(e)
}
return out
}
type ServerInfo struct {
ID string `json:"id"`
Host string `json:"host"`
PID int `json:"pid"`
Concurrency int `json:"concurrency"`
Queues map[string]int `json:"queue_priorities"`
StrictPriority bool `json:"strict_priority_enabled"`
Started string `json:"start_time"`
Status string `json:"status"`
ActiveWorkers []*WorkerInfo `json:"active_workers"`
}
2021-01-29 11:47:58 +08:00
func toServerInfo(info *inspeq.ServerInfo) *ServerInfo {
return &ServerInfo{
ID: info.ID,
Host: info.Host,
PID: info.PID,
Concurrency: info.Concurrency,
Queues: info.Queues,
StrictPriority: info.StrictPriority,
Started: info.Started.Format(time.RFC3339),
Status: info.Status,
ActiveWorkers: toWorkerInfoList(info.ActiveWorkers),
}
}
2021-01-29 11:47:58 +08:00
func toServerInfoList(in []*inspeq.ServerInfo) []*ServerInfo {
out := make([]*ServerInfo, len(in))
for i, s := range in {
out[i] = toServerInfo(s)
}
return out
}
type WorkerInfo struct {
Task *ActiveTask `json:"task"`
Started string `json:"start_time"`
}
2021-01-29 11:47:58 +08:00
func toWorkerInfo(info *inspeq.WorkerInfo) *WorkerInfo {
return &WorkerInfo{
Task: toActiveTask(info.Task),
Started: info.Started.Format(time.RFC3339),
}
}
2021-01-29 11:47:58 +08:00
func toWorkerInfoList(in []*inspeq.WorkerInfo) []*WorkerInfo {
out := make([]*WorkerInfo, len(in))
for i, w := range in {
out[i] = toWorkerInfo(w)
}
return out
}