Update to use new Task API

This commit is contained in:
Ken Hibino
2021-04-04 15:15:54 -07:00
parent 1149ad737d
commit d068f274f7
3 changed files with 97 additions and 26 deletions

View File

@@ -3,7 +3,6 @@ package main
import (
"time"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/inspeq"
)
@@ -85,13 +84,13 @@ func toDailyStatsList(in []*inspeq.DailyStats) []*DailyStats {
}
type BaseTask struct {
ID string `json:"id"`
Type string `json:"type"`
Payload asynq.Payload `json:"payload"`
Queue string `json:"queue"`
MaxRetry int `json:"max_retry"`
Retried int `json:"retried"`
LastError string `json:"error_message"`
ID string `json:"id"`
Type string `json:"type"`
Payload []byte `json:"payload"`
Queue string `json:"queue"`
MaxRetry int `json:"max_retry"`
Retried int `json:"retried"`
LastError string `json:"error_message"`
}
type ActiveTask struct {
@@ -114,8 +113,8 @@ type ActiveTask struct {
func toActiveTask(t *inspeq.ActiveTask) *ActiveTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
@@ -140,8 +139,8 @@ type PendingTask struct {
func toPendingTask(t *inspeq.PendingTask) *PendingTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
@@ -170,8 +169,8 @@ type ScheduledTask struct {
func toScheduledTask(t *inspeq.ScheduledTask) *ScheduledTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
@@ -201,8 +200,8 @@ type RetryTask struct {
func toRetryTask(t *inspeq.RetryTask) *RetryTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
@@ -232,8 +231,8 @@ type ArchivedTask struct {
func toArchivedTask(t *inspeq.ArchivedTask) *ArchivedTask {
base := &BaseTask{
ID: t.ID,
Type: t.Type,
Payload: t.Payload,
Type: t.Type(),
Payload: t.Payload(),
Queue: t.Queue,
MaxRetry: t.MaxRetry,
Retried: t.Retried,
@@ -255,12 +254,12 @@ func toArchivedTasks(in []*inspeq.ArchivedTask) []*ArchivedTask {
}
type SchedulerEntry struct {
ID string `json:"id"`
Spec string `json:"spec"`
TaskType string `json:"task_type"`
TaskPayload asynq.Payload `json:"task_payload"`
Opts []string `json:"options"`
NextEnqueueAt string `json:"next_enqueue_at"`
ID string `json:"id"`
Spec string `json:"spec"`
TaskType string `json:"task_type"`
TaskPayload []byte `json:"task_payload"`
Opts []string `json:"options"`
NextEnqueueAt string `json:"next_enqueue_at"`
// This field is omitted if there were no previous enqueue events.
PrevEnqueueAt string `json:"prev_enqueue_at,omitempty"`
}
@@ -277,8 +276,8 @@ func toSchedulerEntry(e *inspeq.SchedulerEntry) *SchedulerEntry {
return &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
TaskType: e.Task.Type,
TaskPayload: e.Task.Payload,
TaskType: e.Task.Type(),
TaskPayload: e.Task.Payload(),
Opts: opts,
NextEnqueueAt: e.Next.Format(time.RFC3339),
PrevEnqueueAt: prev,