From 136d1c9ea93b23563a196499bf2b6241df0f37b9 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 8 May 2021 11:45:30 -0700 Subject: [PATCH] Update rdb.List* methods with specific errors --- internal/rdb/inspect.go | 57 ++++++++++++++++++++++++++---------- internal/rdb/inspect_test.go | 36 +++++++++++++++++++++++ 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 9a130a7..94f4f42 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -309,18 +309,28 @@ func (p Pagination) stop() int64 { // ListPending returns pending tasks that are ready to be processed. func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { + var op errors.Op = "rdb.ListPending" if !r.client.SIsMember(base.AllQueues, qname).Val() { - return nil, fmt.Errorf("queue %q does not exist", qname) + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - return r.listMessages(base.PendingKey(qname), qname, pgn) + res, err := r.listMessages(base.PendingKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return res, nil } // ListActive returns all tasks that are currently being processed for the given queue. func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { + var op errors.Op = "rdb.ListActive" if !r.client.SIsMember(base.AllQueues, qname).Val() { - return nil, fmt.Errorf("queue %q does not exist", qname) + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - return r.listMessages(base.ActiveKey(qname), qname, pgn) + res, err := r.listMessages(base.ActiveKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return res, nil } // KEYS[1] -> key for id list (e.g. asynq:{}:pending) @@ -346,11 +356,11 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa res, err := listMessagesCmd.Run(r.client, []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result() if err != nil { - return nil, err + return nil, errors.E(errors.Unknown, err) } data, err := cast.ToStringSliceE(res) if err != nil { - return nil, err + return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } reverse(data) var msgs []*base.TaskMessage @@ -368,27 +378,42 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa // ListScheduled returns all tasks from the given queue that are scheduled // to be processed in the future. func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { + var op errors.Op = "rdb.ListScheduled" if !r.client.SIsMember(base.AllQueues, qname).Val() { - return nil, fmt.Errorf("queue %q does not exist", qname) + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) + res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return res, nil } // ListRetry returns all tasks from the given queue that have failed before // and willl be retried in the future. func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { + var op errors.Op = "rdb.ListRetry" if !r.client.SIsMember(base.AllQueues, qname).Val() { - return nil, fmt.Errorf("queue %q does not exist", qname) + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - return r.listZSetEntries(base.RetryKey(qname), qname, pgn) + res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return res, nil } // ListArchived returns all tasks from the given queue that have exhausted its retry limit. func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { + var op errors.Op = "rdb.ListArchived" if !r.client.SIsMember(base.AllQueues, qname).Val() { - return nil, fmt.Errorf("queue %q does not exist", qname) + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) + res, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return res, nil } // KEYS[1] -> key for ids set (e.g. asynq:{}:scheduled) @@ -415,21 +440,21 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro res, err := listZSetEntriesCmd.Run(r.client, []string{key}, pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result() if err != nil { - return nil, err + return nil, errors.E(errors.Unknown, err) } data, err := cast.ToSliceE(res) if err != nil { - return nil, err + return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } var zs []base.Z for i := 0; i < len(data); i += 2 { s, err := cast.ToStringE(data[i]) if err != nil { - return nil, err + return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } score, err := cast.ToInt64E(data[i+1]) if err != nil { - return nil, err + return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } msg, err := base.DecodeMessage([]byte(s)) if err != nil { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index fcb6f46..c711153 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -992,6 +992,42 @@ func TestListArchivedPagination(t *testing.T) { } } +func TestListTasksError(t *testing.T) { + r := setup(t) + defer r.Close() + + tests := []struct { + desc string + qname string + match func(err error) bool + }{ + { + desc: "It returns QueueNotFoundError if queue doesn't exist", + qname: "nonexistent", + match: errors.IsQueueNotFound, + }, + } + + for _, tc := range tests { + pgn := Pagination{Page: 0, Size: 20} + if _, got := r.ListActive(tc.qname, pgn); !tc.match(got) { + t.Errorf("%s: ListActive returned %v", tc.desc, got) + } + if _, got := r.ListPending(tc.qname, pgn); !tc.match(got) { + t.Errorf("%s: ListPending returned %v", tc.desc, got) + } + if _, got := r.ListScheduled(tc.qname, pgn); !tc.match(got) { + t.Errorf("%s: ListScheduled returned %v", tc.desc, got) + } + if _, got := r.ListRetry(tc.qname, pgn); !tc.match(got) { + t.Errorf("%s: ListRetry returned %v", tc.desc, got) + } + if _, got := r.ListArchived(tc.qname, pgn); !tc.match(got) { + t.Errorf("%s: ListArchived returned %v", tc.desc, got) + } + } +} + var ( timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score