2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Add Retry and LastError fields to inspector tasks

This commit is contained in:
Ken Hibino 2021-01-27 06:59:32 -08:00
parent afde6a7266
commit bfde0b6283
3 changed files with 57 additions and 24 deletions

View File

@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added ### Added
- `MemoryUsage` field is added to `QueueStats`. - `MaxRetry`, `Retried`, `LastError` fields were added to all task types returned from `Inspector`.
- `MemoryUsage` field was added to `QueueStats`.
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector` - `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`. - `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
- asynq CLI now supports deleting/archiving pending tasks. - asynq CLI now supports deleting/archiving pending tasks.

View File

@ -169,15 +169,21 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// PendingTask is a task in a queue and is ready to be processed. // PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct { type PendingTask struct {
*Task *Task
ID string ID string
Queue string Queue string
MaxRetry int
Retried int
LastError string
} }
// ActiveTask is a task that's currently being processed. // ActiveTask is a task that's currently being processed.
type ActiveTask struct { type ActiveTask struct {
*Task *Task
ID string ID string
Queue string Queue string
MaxRetry int
Retried int
LastError string
} }
// ScheduledTask is a task scheduled to be processed in the future. // ScheduledTask is a task scheduled to be processed in the future.
@ -185,6 +191,9 @@ type ScheduledTask struct {
*Task *Task
ID string ID string
Queue string Queue string
MaxRetry int
Retried int
LastError string
NextProcessAt time.Time NextProcessAt time.Time
score int64 score int64
@ -198,7 +207,7 @@ type RetryTask struct {
NextProcessAt time.Time NextProcessAt time.Time
MaxRetry int MaxRetry int
Retried int Retried int
ErrorMsg string LastError string
// TODO: LastFailedAt time.Time // TODO: LastFailedAt time.Time
score int64 score int64
@ -215,7 +224,7 @@ type ArchivedTask struct {
MaxRetry int MaxRetry int
Retried int Retried int
LastFailedAt time.Time LastFailedAt time.Time
ErrorMsg string LastError string
score int64 score int64
} }
@ -355,9 +364,12 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask var tasks []*PendingTask
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &PendingTask{ tasks = append(tasks, &PendingTask{
Task: NewTask(m.Type, m.Payload), Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(), ID: m.ID.String(),
Queue: m.Queue, Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
}) })
} }
return tasks, err return tasks, err
@ -378,10 +390,14 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
} }
var tasks []*ActiveTask var tasks []*ActiveTask
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &ActiveTask{ tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload), Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(), ID: m.ID.String(),
Queue: m.Queue, Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
}) })
} }
return tasks, err return tasks, err
@ -409,6 +425,9 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
Task: t, Task: t,
ID: z.Message.ID.String(), ID: z.Message.ID.String(),
Queue: z.Message.Queue, Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastError: z.Message.ErrorMsg,
NextProcessAt: processAt, NextProcessAt: processAt,
score: z.Score, score: z.Score,
}) })
@ -442,8 +461,8 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
MaxRetry: z.Message.Retry, MaxRetry: z.Message.Retry,
Retried: z.Message.Retried, Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt // TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg, LastError: z.Message.ErrorMsg,
score: z.Score, score: z.Score,
}) })
} }
return tasks, nil return tasks, nil
@ -474,7 +493,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
MaxRetry: z.Message.Retry, MaxRetry: z.Message.Retry,
Retried: z.Message.Retried, Retried: z.Message.Retried,
LastFailedAt: failedAt, LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg, LastError: z.Message.ErrorMsg,
score: z.Score, score: z.Score,
}) })
} }

View File

@ -423,9 +423,12 @@ func TestInspectorHistory(t *testing.T) {
func createPendingTask(msg *base.TaskMessage) *PendingTask { func createPendingTask(msg *base.TaskMessage) *PendingTask {
return &PendingTask{ return &PendingTask{
Task: NewTask(msg.Type, msg.Payload), Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(), ID: msg.ID.String(),
Queue: msg.Queue, Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
} }
} }
@ -510,9 +513,12 @@ func TestInspectorListActiveTasks(t *testing.T) {
createActiveTask := func(msg *base.TaskMessage) *ActiveTask { createActiveTask := func(msg *base.TaskMessage) *ActiveTask {
return &ActiveTask{ return &ActiveTask{
Task: NewTask(msg.Type, msg.Payload), Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(), ID: msg.ID.String(),
Queue: msg.Queue, Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
} }
} }
@ -559,6 +565,9 @@ func createScheduledTask(z base.Z) *ScheduledTask {
Task: NewTask(msg.Type, msg.Payload), Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(), ID: msg.ID.String(),
Queue: msg.Queue, Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
NextProcessAt: time.Unix(z.Score, 0), NextProcessAt: time.Unix(z.Score, 0),
score: z.Score, score: z.Score,
} }
@ -635,7 +644,7 @@ func createRetryTask(z base.Z) *RetryTask {
NextProcessAt: time.Unix(z.Score, 0), NextProcessAt: time.Unix(z.Score, 0),
MaxRetry: msg.Retry, MaxRetry: msg.Retry,
Retried: msg.Retried, Retried: msg.Retried,
ErrorMsg: msg.ErrorMsg, LastError: msg.ErrorMsg,
score: z.Score, score: z.Score,
} }
} }
@ -712,7 +721,7 @@ func createArchivedTask(z base.Z) *ArchivedTask {
MaxRetry: msg.Retry, MaxRetry: msg.Retry,
Retried: msg.Retried, Retried: msg.Retried,
LastFailedAt: time.Unix(z.Score, 0), LastFailedAt: time.Unix(z.Score, 0),
ErrorMsg: msg.ErrorMsg, LastError: msg.ErrorMsg,
score: z.Score, score: z.Score,
} }
} }