2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-13 04:46:39 +08:00

Update rdb.List* methods with specific errors

This commit is contained in:
Ken Hibino 2021-05-08 11:45:30 -07:00
parent 52e04355d3
commit 136d1c9ea9
2 changed files with 77 additions and 16 deletions

View File

@ -309,18 +309,28 @@ func (p Pagination) stop() int64 {
// ListPending returns pending tasks that are ready to be processed. // ListPending returns pending tasks that are ready to be processed.
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListPending"
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
return r.listMessages(base.PendingKey(qname), qname, pgn) res, err := r.listMessages(base.PendingKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
} }
// ListActive returns all tasks that are currently being processed for the given queue. // ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListActive"
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
return r.listMessages(base.ActiveKey(qname), qname, pgn) res, err := r.listMessages(base.ActiveKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
} }
// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending) // KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)
@ -346,11 +356,11 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
res, err := listMessagesCmd.Run(r.client, res, err := listMessagesCmd.Run(r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result() []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Unknown, err)
} }
data, err := cast.ToStringSliceE(res) data, err := cast.ToStringSliceE(res)
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
} }
reverse(data) reverse(data)
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
@ -368,27 +378,42 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
// ListScheduled returns all tasks from the given queue that are scheduled // ListScheduled returns all tasks from the given queue that are scheduled
// to be processed in the future. // to be processed in the future.
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListScheduled"
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
} }
// ListRetry returns all tasks from the given queue that have failed before // ListRetry returns all tasks from the given queue that have failed before
// and willl be retried in the future. // and willl be retried in the future.
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListRetry"
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
return r.listZSetEntries(base.RetryKey(qname), qname, pgn) res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
} }
// ListArchived returns all tasks from the given queue that have exhausted its retry limit. // ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListArchived"
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) res, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
} }
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled) // KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
@ -415,21 +440,21 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
res, err := listZSetEntriesCmd.Run(r.client, []string{key}, res, err := listZSetEntriesCmd.Run(r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result() pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Unknown, err)
} }
data, err := cast.ToSliceE(res) data, err := cast.ToSliceE(res)
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
} }
var zs []base.Z var zs []base.Z
for i := 0; i < len(data); i += 2 { for i := 0; i < len(data); i += 2 {
s, err := cast.ToStringE(data[i]) s, err := cast.ToStringE(data[i])
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
} }
score, err := cast.ToInt64E(data[i+1]) score, err := cast.ToInt64E(data[i+1])
if err != nil { if err != nil {
return nil, err return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
} }
msg, err := base.DecodeMessage([]byte(s)) msg, err := base.DecodeMessage([]byte(s))
if err != nil { if err != nil {

View File

@ -992,6 +992,42 @@ func TestListArchivedPagination(t *testing.T) {
} }
} }
func TestListTasksError(t *testing.T) {
r := setup(t)
defer r.Close()
tests := []struct {
desc string
qname string
match func(err error) bool
}{
{
desc: "It returns QueueNotFoundError if queue doesn't exist",
qname: "nonexistent",
match: errors.IsQueueNotFound,
},
}
for _, tc := range tests {
pgn := Pagination{Page: 0, Size: 20}
if _, got := r.ListActive(tc.qname, pgn); !tc.match(got) {
t.Errorf("%s: ListActive returned %v", tc.desc, got)
}
if _, got := r.ListPending(tc.qname, pgn); !tc.match(got) {
t.Errorf("%s: ListPending returned %v", tc.desc, got)
}
if _, got := r.ListScheduled(tc.qname, pgn); !tc.match(got) {
t.Errorf("%s: ListScheduled returned %v", tc.desc, got)
}
if _, got := r.ListRetry(tc.qname, pgn); !tc.match(got) {
t.Errorf("%s: ListRetry returned %v", tc.desc, got)
}
if _, got := r.ListArchived(tc.qname, pgn); !tc.match(got) {
t.Errorf("%s: ListArchived returned %v", tc.desc, got)
}
}
}
var ( var (
timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time
zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score