2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Update List methods (expect for ListArchived)

This commit is contained in:
Ken Hibino 2021-05-18 19:13:52 -07:00
parent 12f4c7cf6e
commit 840f7245b1
2 changed files with 99 additions and 137 deletions

View File

@ -352,25 +352,24 @@ func Page(n int) ListOption {
// ListPendingTasks retrieves pending tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) {
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListPending(qname, pgn)
if err != nil {
return nil, err
// TODO: Handle ErrQueueNotFound
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*PendingTask
now := time.Now()
var tasks []*TaskInfo
for _, m := range msgs {
tasks = append(tasks, &PendingTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
tasks = append(tasks, &TaskInfo{
msg: m,
state: base.TaskStatePending,
nextProcessAt: now,
})
}
return tasks, err
@ -379,114 +378,100 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
// ListActiveTasks retrieves active tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) {
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListActive(qname, pgn)
if err != nil {
return nil, err
// TODO: Handle QueueNotFound
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*ActiveTask
var tasks []*TaskInfo
for _, m := range msgs {
tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
tasks = append(tasks, &TaskInfo{
msg: m,
state: base.TaskStateActive,
})
}
return tasks, err
}
// ListScheduledTasks retrieves scheduled tasks from the specified queue.
// Tasks are sorted by NextProcessAt field in ascending order.
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(qname, pgn)
if err != nil {
return nil, err
// TODO: handle ErrQueueNotFound
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*ScheduledTask
var tasks []*TaskInfo
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastError: z.Message.ErrorMsg,
NextProcessAt: processAt,
score: z.Score,
tasks = append(tasks, &TaskInfo{
msg: z.Message,
state: base.TaskStateScheduled,
nextProcessAt: time.Unix(z.Score, 0),
})
}
return tasks, nil
}
// ListRetryTasks retrieves retry tasks from the specified queue.
// Tasks are sorted by NextProcessAt field in ascending order.
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(qname, pgn)
if err != nil {
return nil, err
// TODO: handle ErrQueueNotFound
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*RetryTask
var tasks []*TaskInfo
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextProcessAt: processAt,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
LastError: z.Message.ErrorMsg,
score: z.Score,
tasks = append(tasks, &TaskInfo{
msg: z.Message,
state: base.TaskStateRetry,
nextProcessAt: time.Unix(z.Score, 0),
})
}
return tasks, nil
}
// ListArchivedTasks retrieves archived tasks from the specified queue.
// Tasks are sorted by LastFailedAt field in descending order.
// Tasks are sorted by LastFailedAt in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) {
if err := base.ValidateQueueName(qname); err != nil {
return nil, err
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListArchived(qname, pgn)
if err != nil {
return nil, err
// TODO: handle ErrQueueNotFound
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*ArchivedTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{
// TODO: How to handle last failed at
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,

View File

@ -422,14 +422,11 @@ func TestInspectorHistory(t *testing.T) {
}
}
func createPendingTask(msg *base.TaskMessage) *PendingTask {
return &PendingTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
func createPendingTask(msg *base.TaskMessage) *TaskInfo {
return &TaskInfo{
msg: msg,
state: base.TaskStatePending,
nextProcessAt: time.Now(),
}
}
@ -447,7 +444,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
desc string
pending map[string][]*base.TaskMessage
qname string
want []*PendingTask
want []*TaskInfo
}{
{
desc: "with default queue",
@ -455,7 +452,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
"default": {m1, m2},
},
qname: "default",
want: []*PendingTask{
want: []*TaskInfo{
createPendingTask(m1),
createPendingTask(m2),
},
@ -468,7 +465,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
"low": {m4},
},
qname: "critical",
want: []*PendingTask{
want: []*TaskInfo{
createPendingTask(m3),
},
},
@ -478,7 +475,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
"default": {},
},
qname: "default",
want: []*PendingTask(nil),
want: []*TaskInfo(nil),
},
}
@ -494,8 +491,11 @@ func TestInspectorListPendingTasks(t *testing.T) {
tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
cmpOpts := []cmp.Option{
cmpopts.EquateApproxTime(2 * time.Second),
cmp.AllowUnexported(TaskInfo{}),
}
if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" {
t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
@ -512,22 +512,11 @@ func TestInspectorListActiveTasks(t *testing.T) {
inspector := NewInspector(getRedisConnOpt(t))
createActiveTask := func(msg *base.TaskMessage) *ActiveTask {
return &ActiveTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
}
}
tests := []struct {
desc string
active map[string][]*base.TaskMessage
qname string
want []*ActiveTask
want []*TaskInfo
}{
{
desc: "with a few active tasks",
@ -536,9 +525,9 @@ func TestInspectorListActiveTasks(t *testing.T) {
"custom": {m3, m4},
},
qname: "default",
want: []*ActiveTask{
createActiveTask(m1),
createActiveTask(m2),
want: []*TaskInfo{
{msg: m1, state: base.TaskStateActive, nextProcessAt: time.Time{}},
{msg: m2, state: base.TaskStateActive, nextProcessAt: time.Time{}},
},
},
}
@ -552,25 +541,18 @@ func TestInspectorListActiveTasks(t *testing.T) {
t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
}
}
func createScheduledTask(z base.Z) *ScheduledTask {
msg := z.Message
return &ScheduledTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
NextProcessAt: time.Unix(z.Score, 0),
score: z.Score,
func createScheduledTask(z base.Z) *TaskInfo {
return &TaskInfo{
msg: z.Message,
state: base.TaskStateScheduled,
nextProcessAt: time.Unix(z.Score, 0),
}
}
@ -593,7 +575,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
desc string
scheduled map[string][]base.Z
qname string
want []*ScheduledTask
want []*TaskInfo
}{
{
desc: "with a few scheduled tasks",
@ -603,7 +585,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
},
qname: "default",
// Should be sorted by NextProcessAt.
want: []*ScheduledTask{
want: []*TaskInfo{
createScheduledTask(z3),
createScheduledTask(z1),
createScheduledTask(z2),
@ -615,7 +597,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
"default": {},
},
qname: "default",
want: []*ScheduledTask(nil),
want: []*TaskInfo(nil),
},
}
@ -628,25 +610,18 @@ func TestInspectorListScheduledTasks(t *testing.T) {
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ScheduledTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
}
}
func createRetryTask(z base.Z) *RetryTask {
msg := z.Message
return &RetryTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
NextProcessAt: time.Unix(z.Score, 0),
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
score: z.Score,
func createRetryTask(z base.Z) *TaskInfo {
return &TaskInfo{
msg: z.Message,
state: base.TaskStateRetry,
nextProcessAt: time.Unix(z.Score, 0),
}
}
@ -669,7 +644,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
desc string
retry map[string][]base.Z
qname string
want []*RetryTask
want []*TaskInfo
}{
{
desc: "with a few retry tasks",
@ -679,7 +654,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
},
qname: "default",
// Should be sorted by NextProcessAt.
want: []*RetryTask{
want: []*TaskInfo{
createRetryTask(z3),
createRetryTask(z1),
createRetryTask(z2),
@ -691,7 +666,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
"default": {},
},
qname: "default",
want: []*RetryTask(nil),
want: []*TaskInfo(nil),
},
// TODO(hibiken): ErrQueueNotFound when queue doesn't exist
}
@ -705,8 +680,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Task{}, RetryTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" {
t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
@ -805,12 +779,12 @@ func TestInspectorListPagination(t *testing.T) {
tests := []struct {
page int
pageSize int
want []*PendingTask
want []*TaskInfo
}{
{
page: 1,
pageSize: 5,
want: []*PendingTask{
want: []*TaskInfo{
createPendingTask(msgs[0]),
createPendingTask(msgs[1]),
createPendingTask(msgs[2]),
@ -821,7 +795,7 @@ func TestInspectorListPagination(t *testing.T) {
{
page: 3,
pageSize: 10,
want: []*PendingTask{
want: []*TaskInfo{
createPendingTask(msgs[20]),
createPendingTask(msgs[21]),
createPendingTask(msgs[22]),
@ -842,8 +816,11 @@ func TestInspectorListPagination(t *testing.T) {
t.Errorf("ListPendingTask('default') returned error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
cmpOpts := []cmp.Option{
cmpopts.EquateApproxTime(2 * time.Second),
cmp.AllowUnexported(TaskInfo{}),
}
if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" {
t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff)
}
@ -1841,7 +1818,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
"custom": {m3},
},
qname: "default",
id: createPendingTask(m2).ID,
id: createPendingTask(m2).ID(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m3},
@ -1853,7 +1830,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
"custom": {m3},
},
qname: "custom",
id: createPendingTask(m3).ID,
id: createPendingTask(m3).ID(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
@ -1906,7 +1883,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) {
"custom": {z3},
},
qname: "default",
id: createScheduledTask(z2).ID,
id: createScheduledTask(z2).ID(),
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -1956,7 +1933,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) {
"custom": {z3},
},
qname: "default",
id: createRetryTask(z2).ID,
id: createRetryTask(z2).ID(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2127,7 +2104,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
"custom": {},
},
qname: "default",
id: createScheduledTask(z2).ID,
id: createScheduledTask(z2).ID(),
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2197,7 +2174,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
"custom": {},
},
qname: "custom",
id: createRetryTask(z2).ID,
id: createRetryTask(z2).ID(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2435,7 +2412,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {},
},
qname: "default",
id: createPendingTask(m1).ID,
id: createPendingTask(m1).ID(),
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m2, m3},
@ -2457,7 +2434,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
"custom": {},
},
qname: "custom",
id: createPendingTask(m2).ID,
id: createPendingTask(m2).ID(),
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m3},
@ -2530,7 +2507,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
"custom": {},
},
qname: "custom",
id: createScheduledTask(z2).ID,
id: createScheduledTask(z2).ID(),
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2605,7 +2582,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
"custom": {},
},
qname: "custom",
id: createRetryTask(z2).ID,
id: createRetryTask(z2).ID(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2680,7 +2657,7 @@ func TestInspectorArchiveTaskError(t *testing.T) {
"custom": {},
},
qname: "nonexistent",
id: createRetryTask(z2).ID,
id: createRetryTask(z2).ID(),
wantErr: ErrQueueNotFound,
wantRetry: map[string][]base.Z{
"default": {z1},