diff --git a/CHANGELOG.md b/CHANGELOG.md index 99fdf3f..5250c75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. + ### Added -- `MemoryUsage` field is added to `QueueStats`. +- `MaxRetry`, `Retried`, `LastError` fields were added to all task types returned from `Inspector`. +- `MemoryUsage` field was added to `QueueStats`. - `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector` - `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`. - asynq CLI now supports deleting/archiving pending tasks. diff --git a/inspector.go b/inspector.go index 3d6ba67..1ff029b 100644 --- a/inspector.go +++ b/inspector.go @@ -169,15 +169,21 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { // PendingTask is a task in a queue and is ready to be processed. type PendingTask struct { *Task - ID string - Queue string + ID string + Queue string + MaxRetry int + Retried int + LastError string } // ActiveTask is a task that's currently being processed. type ActiveTask struct { *Task - ID string - Queue string + ID string + Queue string + MaxRetry int + Retried int + LastError string } // ScheduledTask is a task scheduled to be processed in the future. @@ -185,6 +191,9 @@ type ScheduledTask struct { *Task ID string Queue string + MaxRetry int + Retried int + LastError string NextProcessAt time.Time score int64 @@ -198,7 +207,7 @@ type RetryTask struct { NextProcessAt time.Time MaxRetry int Retried int - ErrorMsg string + LastError string // TODO: LastFailedAt time.Time score int64 @@ -215,7 +224,7 @@ type ArchivedTask struct { MaxRetry int Retried int LastFailedAt time.Time - ErrorMsg string + LastError string score int64 } @@ -355,9 +364,12 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi var tasks []*PendingTask for _, m := range msgs { tasks = append(tasks, &PendingTask{ - Task: NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, + Task: NewTask(m.Type, m.Payload), + ID: m.ID.String(), + Queue: m.Queue, + MaxRetry: m.Retry, + Retried: m.Retried, + LastError: m.ErrorMsg, }) } return tasks, err @@ -378,10 +390,14 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active } var tasks []*ActiveTask for _, m := range msgs { + tasks = append(tasks, &ActiveTask{ - Task: NewTask(m.Type, m.Payload), - ID: m.ID.String(), - Queue: m.Queue, + Task: NewTask(m.Type, m.Payload), + ID: m.ID.String(), + Queue: m.Queue, + MaxRetry: m.Retry, + Retried: m.Retried, + LastError: m.ErrorMsg, }) } return tasks, err @@ -409,6 +425,9 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch Task: t, ID: z.Message.ID.String(), Queue: z.Message.Queue, + MaxRetry: z.Message.Retry, + Retried: z.Message.Retried, + LastError: z.Message.ErrorMsg, NextProcessAt: processAt, score: z.Score, }) @@ -442,8 +461,8 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa MaxRetry: z.Message.Retry, Retried: z.Message.Retried, // TODO: LastFailedAt: z.Message.LastFailedAt - ErrorMsg: z.Message.ErrorMsg, - score: z.Score, + LastError: z.Message.ErrorMsg, + score: z.Score, }) } return tasks, nil @@ -474,7 +493,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch MaxRetry: z.Message.Retry, Retried: z.Message.Retried, LastFailedAt: failedAt, - ErrorMsg: z.Message.ErrorMsg, + LastError: z.Message.ErrorMsg, score: z.Score, }) } diff --git a/inspector_test.go b/inspector_test.go index cd6267d..fce1212 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -423,9 +423,12 @@ func TestInspectorHistory(t *testing.T) { func createPendingTask(msg *base.TaskMessage) *PendingTask { return &PendingTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, + MaxRetry: msg.Retry, + Retried: msg.Retried, + LastError: msg.ErrorMsg, } } @@ -510,9 +513,12 @@ func TestInspectorListActiveTasks(t *testing.T) { createActiveTask := func(msg *base.TaskMessage) *ActiveTask { return &ActiveTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, + MaxRetry: msg.Retry, + Retried: msg.Retried, + LastError: msg.ErrorMsg, } } @@ -559,6 +565,9 @@ func createScheduledTask(z base.Z) *ScheduledTask { Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, + MaxRetry: msg.Retry, + Retried: msg.Retried, + LastError: msg.ErrorMsg, NextProcessAt: time.Unix(z.Score, 0), score: z.Score, } @@ -635,7 +644,7 @@ func createRetryTask(z base.Z) *RetryTask { NextProcessAt: time.Unix(z.Score, 0), MaxRetry: msg.Retry, Retried: msg.Retried, - ErrorMsg: msg.ErrorMsg, + LastError: msg.ErrorMsg, score: z.Score, } } @@ -712,7 +721,7 @@ func createArchivedTask(z base.Z) *ArchivedTask { MaxRetry: msg.Retry, Retried: msg.Retried, LastFailedAt: time.Unix(z.Score, 0), - ErrorMsg: msg.ErrorMsg, + LastError: msg.ErrorMsg, score: z.Score, } }