2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Update Inspector.List*Task methods to return ErrQueueNotFound

This commit is contained in:
Ken Hibino 2021-05-19 05:59:30 -07:00
parent e63d51da0c
commit ea9086fd8b
2 changed files with 57 additions and 10 deletions

View File

@ -359,8 +359,10 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListPending(qname, pgn)
if err != nil {
// TODO: Handle ErrQueueNotFound
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
now := time.Now()
@ -385,8 +387,10 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListActive(qname, pgn)
if err != nil {
// TODO: Handle QueueNotFound
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
@ -410,8 +414,10 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(qname, pgn)
if err != nil {
// TODO: handle ErrQueueNotFound
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
@ -436,8 +442,10 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(qname, pgn)
if err != nil {
// TODO: handle ErrQueueNotFound
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo
@ -462,8 +470,10 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListArchived(qname, pgn)
if err != nil {
// TODO: handle ErrQueueNotFound
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
var tasks []*TaskInfo

View File

@ -820,6 +820,43 @@ func TestInspectorListPagination(t *testing.T) {
}
}
func TestInspectorListTasksQueueNotFoundError(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
qname string
wantErr error
}{
{
qname: "nonexistent",
wantErr: ErrQueueNotFound,
},
}
for _, tc := range tests {
h.FlushDB(t, r)
if _, err := inspector.ListActiveTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListActiveTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
if _, err := inspector.ListPendingTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListPendingTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
if _, err := inspector.ListScheduledTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListScheduledTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
if _, err := inspector.ListRetryTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListRetryTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
if _, err := inspector.ListArchivedTasks(tc.qname); !errors.Is(err, tc.wantErr) {
t.Errorf("ListArchivedTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr)
}
}
}
func TestInspectorDeleteAllPendingTasks(t *testing.T) {
r := setup(t)
defer r.Close()