From 6437ea4a82f2cbd8cc511cf119ca1f100c46c522 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 25 Feb 2021 22:05:51 -0800 Subject: [PATCH] Update RDB.ListScheduled, RDB.ListRetry, RDB.ListArchived --- internal/rdb/inspect.go | 51 +++++++++++++++++++++++++++--------- internal/rdb/inspect_test.go | 4 +-- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 1fcae64..a9e84f4 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -359,7 +359,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listZSetEntries(base.ScheduledKey(qname), pgn) + return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) } // ListRetry returns all tasks from the given queue that have failed before @@ -368,7 +368,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listZSetEntries(base.RetryKey(qname), pgn) + return r.listZSetEntries(base.RetryKey(qname), qname, pgn) } // ListArchived returns all tasks from the given queue that have exhausted its retry limit. @@ -376,29 +376,56 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listZSetEntries(base.ArchivedKey(qname), pgn) + return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) } +// KEYS[1] -> key for ids set (e.g. asynq:{}:scheduled) +// ARGV[1] -> min +// ARGV[2] -> max +// ARGV[3] -> task key prefix +// +// Returns an array populated with +// [msg1, score1, msg2, score2, ..., msgN, scoreN] +var listZSetEntriesCmd = redis.NewScript(` + local res = {} + local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES") + for i = 1, table.getn(id_score_pairs), 2 do + local key = ARGV[3] .. id_score_pairs[i] + table.insert(res, redis.call("HGET", key, "msg")) + table.insert(res, id_score_pairs[i+1]) + end + return res +`) + // listZSetEntries returns a list of message and score pairs in Redis sorted-set // with the given key. -func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) { - data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result() +func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) { + res, err := listZSetEntriesCmd.Run(r.client, []string{key}, + pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result() if err != nil { return nil, err } - var res []base.Z - for _, z := range data { - s, ok := z.Member.(string) - if !ok { - continue // bad data, ignore and continue + data, err := cast.ToSliceE(res) + if err != nil { + return nil, err + } + var zs []base.Z + for i := 0; i < len(data); i += 2 { + s, err := cast.ToStringE(data[i]) + if err != nil { + return nil, err + } + score, err := cast.ToInt64E(data[i+1]) + if err != nil { + return nil, err } msg, err := base.DecodeMessage(s) if err != nil { continue // bad data, ignore and continue } - res = append(res, base.Z{Message: msg, Score: int64(z.Score)}) + zs = append(zs, base.Z{Message: msg, Score: score}) } - return res, nil + return zs, nil } // RunArchivedTask finds an archived task that matches the given id and score from diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8a27a11..a4ab7ac 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -841,7 +841,7 @@ func TestListRetryPagination(t *testing.T) { } } -func TestListDead(t *testing.T) { +func TestListArchived(t *testing.T) { r := setup(t) defer r.Close() m1 := &base.TaskMessage{ @@ -932,7 +932,7 @@ func TestListDead(t *testing.T) { } } -func TestListDeadPagination(t *testing.T) { +func TestListArchivedPagination(t *testing.T) { r := setup(t) defer r.Close() var entries []base.Z