Update to new asynq API

This commit is contained in:
Ken Hibino
2021-05-28 14:40:09 -07:00
parent d068f274f7
commit fa313ce180
7 changed files with 193 additions and 239 deletions

View File

@@ -3,7 +3,7 @@ package main
import (
"time"
"github.com/hibiken/asynq/inspeq"
"github.com/hibiken/asynq"
)
// ****************************************************************************
@@ -39,7 +39,7 @@ type QueueStateSnapshot struct {
Timestamp time.Time `json:"timestamp"`
}
func toQueueStateSnapshot(s *inspeq.QueueStats) *QueueStateSnapshot {
func toQueueStateSnapshot(s *asynq.QueueInfo) *QueueStateSnapshot {
return &QueueStateSnapshot{
Queue: s.Queue,
MemoryUsage: s.MemoryUsage,
@@ -65,7 +65,7 @@ type DailyStats struct {
Date string `json:"date"`
}
func toDailyStats(s *inspeq.DailyStats) *DailyStats {
func toDailyStats(s *asynq.DailyStats) *DailyStats {
return &DailyStats{
Queue: s.Queue,
Processed: s.Processed,
@@ -75,7 +75,7 @@ func toDailyStats(s *inspeq.DailyStats) *DailyStats {
}
}
func toDailyStatsList(in []*inspeq.DailyStats) []*DailyStats {
func toDailyStatsList(in []*asynq.DailyStats) []*DailyStats {
out := make([]*DailyStats, len(in))
for i, s := range in {
out[i] = toDailyStats(s)
@@ -110,20 +110,20 @@ type ActiveTask struct {
Deadline string `json:"deadline"`
}
func toActiveTask(t *inspeq.ActiveTask) *ActiveTask {
func toActiveTask(t *asynq.TaskInfo) *ActiveTask {
base := &BaseTask{
ID: t.ID,
ID: t.ID(),
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
Queue: t.Queue(),
MaxRetry: t.MaxRetry(),
Retried: t.Retried(),
LastError: t.LastErr(),
}
return &ActiveTask{BaseTask: base}
}
func toActiveTasks(in []*inspeq.ActiveTask) []*ActiveTask {
func toActiveTasks(in []*asynq.TaskInfo) []*ActiveTask {
out := make([]*ActiveTask, len(in))
for i, t := range in {
out[i] = toActiveTask(t)
@@ -131,28 +131,27 @@ func toActiveTasks(in []*inspeq.ActiveTask) []*ActiveTask {
return out
}
// TODO: Maybe we don't need state specific type, just use TaskInfo
type PendingTask struct {
*BaseTask
Key string `json:"key"`
}
func toPendingTask(t *inspeq.PendingTask) *PendingTask {
func toPendingTask(t *asynq.TaskInfo) *PendingTask {
base := &BaseTask{
ID: t.ID,
ID: t.ID(),
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
Queue: t.Queue(),
MaxRetry: t.MaxRetry(),
Retried: t.Retried(),
LastError: t.LastErr(),
}
return &PendingTask{
BaseTask: base,
Key: t.Key(),
}
}
func toPendingTasks(in []*inspeq.PendingTask) []*PendingTask {
func toPendingTasks(in []*asynq.TaskInfo) []*PendingTask {
out := make([]*PendingTask, len(in))
for i, t := range in {
out[i] = toPendingTask(t)
@@ -162,28 +161,26 @@ func toPendingTasks(in []*inspeq.PendingTask) []*PendingTask {
type ScheduledTask struct {
*BaseTask
Key string `json:"key"`
NextProcessAt time.Time `json:"next_process_at"`
}
func toScheduledTask(t *inspeq.ScheduledTask) *ScheduledTask {
func toScheduledTask(t *asynq.TaskInfo) *ScheduledTask {
base := &BaseTask{
ID: t.ID,
ID: t.ID(),
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
Queue: t.Queue(),
MaxRetry: t.MaxRetry(),
Retried: t.Retried(),
LastError: t.LastErr(),
}
return &ScheduledTask{
BaseTask: base,
Key: t.Key(),
NextProcessAt: t.NextProcessAt,
NextProcessAt: t.NextProcessAt(),
}
}
func toScheduledTasks(in []*inspeq.ScheduledTask) []*ScheduledTask {
func toScheduledTasks(in []*asynq.TaskInfo) []*ScheduledTask {
out := make([]*ScheduledTask, len(in))
for i, t := range in {
out[i] = toScheduledTask(t)
@@ -193,28 +190,26 @@ func toScheduledTasks(in []*inspeq.ScheduledTask) []*ScheduledTask {
type RetryTask struct {
*BaseTask
Key string `json:"key"`
NextProcessAt time.Time `json:"next_process_at"`
}
func toRetryTask(t *inspeq.RetryTask) *RetryTask {
func toRetryTask(t *asynq.TaskInfo) *RetryTask {
base := &BaseTask{
ID: t.ID,
ID: t.ID(),
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
Queue: t.Queue(),
MaxRetry: t.MaxRetry(),
Retried: t.Retried(),
LastError: t.LastErr(),
}
return &RetryTask{
BaseTask: base,
Key: t.Key(),
NextProcessAt: t.NextProcessAt,
NextProcessAt: t.NextProcessAt(),
}
}
func toRetryTasks(in []*inspeq.RetryTask) []*RetryTask {
func toRetryTasks(in []*asynq.TaskInfo) []*RetryTask {
out := make([]*RetryTask, len(in))
for i, t := range in {
out[i] = toRetryTask(t)
@@ -224,28 +219,26 @@ func toRetryTasks(in []*inspeq.RetryTask) []*RetryTask {
type ArchivedTask struct {
*BaseTask
Key string `json:"key"`
LastFailedAt time.Time `json:"last_failed_at"`
}
func toArchivedTask(t *inspeq.ArchivedTask) *ArchivedTask {
func toArchivedTask(t *asynq.TaskInfo) *ArchivedTask {
base := &BaseTask{
ID: t.ID,
ID: t.ID(),
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
LastError: t.LastError,
Queue: t.Queue(),
MaxRetry: t.MaxRetry(),
Retried: t.Retried(),
LastError: t.LastErr(),
}
return &ArchivedTask{
BaseTask: base,
Key: t.Key(),
LastFailedAt: t.LastFailedAt,
LastFailedAt: t.LastFailedAt(),
}
}
func toArchivedTasks(in []*inspeq.ArchivedTask) []*ArchivedTask {
func toArchivedTasks(in []*asynq.TaskInfo) []*ArchivedTask {
out := make([]*ArchivedTask, len(in))
for i, t := range in {
out[i] = toArchivedTask(t)
@@ -264,7 +257,7 @@ type SchedulerEntry struct {
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
}
func toSchedulerEntry(e *inspeq.SchedulerEntry) *SchedulerEntry {
func toSchedulerEntry(e *asynq.SchedulerEntry) *SchedulerEntry {
opts := make([]string, 0) // create a non-nil, empty slice to avoid null in json output
for _, o := range e.Opts {
opts = append(opts, o.String())
@@ -284,7 +277,7 @@ func toSchedulerEntry(e *inspeq.SchedulerEntry) *SchedulerEntry {
}
}
func toSchedulerEntries(in []*inspeq.SchedulerEntry) []*SchedulerEntry {
func toSchedulerEntries(in []*asynq.SchedulerEntry) []*SchedulerEntry {
out := make([]*SchedulerEntry, len(in))
for i, e := range in {
out[i] = toSchedulerEntry(e)
@@ -297,14 +290,14 @@ type SchedulerEnqueueEvent struct {
EnqueuedAt string `json:"enqueued_at"`
}
func toSchedulerEnqueueEvent(e *inspeq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
func toSchedulerEnqueueEvent(e *asynq.SchedulerEnqueueEvent) *SchedulerEnqueueEvent {
return &SchedulerEnqueueEvent{
TaskID: e.TaskID,
EnqueuedAt: e.EnqueuedAt.Format(time.RFC3339),
}
}
func toSchedulerEnqueueEvents(in []*inspeq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
func toSchedulerEnqueueEvents(in []*asynq.SchedulerEnqueueEvent) []*SchedulerEnqueueEvent {
out := make([]*SchedulerEnqueueEvent, len(in))
for i, e := range in {
out[i] = toSchedulerEnqueueEvent(e)
@@ -324,7 +317,7 @@ type ServerInfo struct {
ActiveWorkers []*WorkerInfo `json:"active_workers"`
}
func toServerInfo(info *inspeq.ServerInfo) *ServerInfo {
func toServerInfo(info *asynq.ServerInfo) *ServerInfo {
return &ServerInfo{
ID: info.ID,
Host: info.Host,
@@ -338,7 +331,7 @@ func toServerInfo(info *inspeq.ServerInfo) *ServerInfo {
}
}
func toServerInfoList(in []*inspeq.ServerInfo) []*ServerInfo {
func toServerInfoList(in []*asynq.ServerInfo) []*ServerInfo {
out := make([]*ServerInfo, len(in))
for i, s := range in {
out[i] = toServerInfo(s)
@@ -347,18 +340,24 @@ func toServerInfoList(in []*inspeq.ServerInfo) []*ServerInfo {
}
type WorkerInfo struct {
Task *ActiveTask `json:"task"`
Started string `json:"start_time"`
TaskID string `json:"task_id"`
Queue string `json:"queue"`
TaskType string `json:"task_type"`
TakPayload []byte `json:"task_payload"`
Started string `json:"start_time"`
}
func toWorkerInfo(info *inspeq.WorkerInfo) *WorkerInfo {
func toWorkerInfo(info *asynq.WorkerInfo) *WorkerInfo {
return &WorkerInfo{
Task: toActiveTask(info.Task),
Started: info.Started.Format(time.RFC3339),
TaskID: info.TaskID,
Queue: info.Queue,
TaskType: info.TaskType,
TakPayload: info.TaskPayload,
Started: info.Started.Format(time.RFC3339),
}
}
func toWorkerInfoList(in []*inspeq.WorkerInfo) []*WorkerInfo {
func toWorkerInfoList(in []*asynq.WorkerInfo) []*WorkerInfo {
out := make([]*WorkerInfo, len(in))
for i, w := range in {
out[i] = toWorkerInfo(w)