diff --git a/internal/base/base.go b/internal/base/base.go index 067cc88..e412256 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -45,9 +45,14 @@ func ValidateQueueName(qname string) error { return nil } +// TaskKeyPrefix returns a prefix for task key. +func TaskKeyPrefix(qname string) string { + return fmt.Sprintf("asynq:{%s}:t:", qname) +} + // TaskKey returns a redis key for the given task message. func TaskKey(qname, id string) string { - return fmt.Sprintf("asynq:{%s}:t:%s", qname, id) + return fmt.Sprintf("%s%s", TaskKeyPrefix(qname), id) } // PendingKey returns a redis key for the given queue name. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index ddfb64c..f99757f 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -40,8 +40,8 @@ func TestQueueKey(t *testing.T) { qname string want string }{ - {"default", "asynq:{default}"}, - {"custom", "asynq:{custom}"}, + {"default", "asynq:{default}:pending"}, + {"custom", "asynq:{custom}:pending"}, } for _, tc := range tests { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f8caefe..c9e8ccb 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,7 +9,6 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "time" "github.com/go-redis/redis/v7" @@ -565,19 +564,33 @@ func (r *RDB) forwardAll(src, dst string) (err error) { return nil } +// KEYS[1] -> asynq:{}:deadlines +// ARGV[1] -> deadline in unix time +// ARGV[2] -> task key prefix +var listDeadlineExceededCmd = redis.NewScript(` +local res = {} +local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +for _, id in ipairs(ids) do + table.insert(res, redis.call("GET", ARGV[2] .. id)) +end +return res +`) + // ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues. func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { var msgs []*base.TaskMessage - opt := &redis.ZRangeBy{ - Min: "-inf", - Max: strconv.FormatInt(deadline.Unix(), 10), - } for _, qname := range qnames { - res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result() + res, err := listDeadlineExceededCmd.Run(r.client, + []string{base.DeadlinesKey(qname)}, + deadline.Unix(), base.TaskKeyPrefix(qname)).Result() if err != nil { return nil, err } - for _, s := range res { + data, err := cast.ToStringSliceE(res) + if err != nil { + return nil, err + } + for _, s := range data { msg, err := base.DecodeMessage(s) if err != nil { return nil, err