diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 9c1cc50..1fcae64 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -300,7 +300,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listMessages(base.PendingKey(qname), pgn) + return r.listMessages(base.PendingKey(qname), qname, pgn) } // ListActive returns all tasks that are currently being processed for the given queue. @@ -308,16 +308,35 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listMessages(base.ActiveKey(qname), pgn) + return r.listMessages(base.ActiveKey(qname), qname, pgn) } +// KEYS[1] -> key for id list (e.g. asynq:{}:pending) +// ARGV[1] -> start offset +// ARGV[2] -> stop offset +// ARGV[3] -> task key prefix +var listMessagesCmd = redis.NewScript(` + local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) + local res = {} + for _, id in ipairs(ids) do + local key = ARGV[3] .. id + table.insert(res, redis.call("HGET", key, "msg")) + end + return res +`) + // listMessages returns a list of TaskMessage in Redis list with the given key. -func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) { +func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) { // Note: Because we use LPUSH to redis list, we need to calculate the // correct range and reverse the list to get the tasks with pagination. stop := -pgn.start() - 1 start := -pgn.stop() - 1 - data, err := r.client.LRange(key, start, stop).Result() + res, err := listMessagesCmd.Run(r.client, + []string{key}, start, stop, base.TaskKeyPrefix(qname)).Result() + if err != nil { + return nil, err + } + data, err := cast.ToStringSliceE(res) if err != nil { return nil, err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 00355ae..8a27a11 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -386,7 +386,7 @@ func TestListPendingPagination(t *testing.T) { msgs = []*base.TaskMessage(nil) // empty list for i := 0; i < 100; i++ { - msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil) + msg := h.NewTaskMessageWithQueue(fmt.Sprintf("custom %d", i), nil, "custom") msgs = append(msgs, msg) } // create 100 tasks in custom queue