2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ListDeadlineExceeded

This commit is contained in:
Ken Hibino 2021-02-23 14:44:59 -08:00
parent 5105f35697
commit ec9fd6b577
3 changed files with 28 additions and 10 deletions

View File

@ -45,9 +45,14 @@ func ValidateQueueName(qname string) error {
return nil return nil
} }
// TaskKeyPrefix returns a prefix for task key.
func TaskKeyPrefix(qname string) string {
return fmt.Sprintf("asynq:{%s}:t:", qname)
}
// TaskKey returns a redis key for the given task message. // TaskKey returns a redis key for the given task message.
func TaskKey(qname, id string) string { func TaskKey(qname, id string) string {
return fmt.Sprintf("asynq:{%s}:t:%s", qname, id) return fmt.Sprintf("%s%s", TaskKeyPrefix(qname), id)
} }
// PendingKey returns a redis key for the given queue name. // PendingKey returns a redis key for the given queue name.

View File

@ -40,8 +40,8 @@ func TestQueueKey(t *testing.T) {
qname string qname string
want string want string
}{ }{
{"default", "asynq:{default}"}, {"default", "asynq:{default}:pending"},
{"custom", "asynq:{custom}"}, {"custom", "asynq:{custom}:pending"},
} }
for _, tc := range tests { for _, tc := range tests {

View File

@ -9,7 +9,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv"
"time" "time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
@ -565,19 +564,33 @@ func (r *RDB) forwardAll(src, dst string) (err error) {
return nil return nil
} }
// KEYS[1] -> asynq:{<qname>}:deadlines
// ARGV[1] -> deadline in unix time
// ARGV[2] -> task key prefix
var listDeadlineExceededCmd = redis.NewScript(`
local res = {}
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, id in ipairs(ids) do
table.insert(res, redis.call("GET", ARGV[2] .. id))
end
return res
`)
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues. // ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
opt := &redis.ZRangeBy{
Min: "-inf",
Max: strconv.FormatInt(deadline.Unix(), 10),
}
for _, qname := range qnames { for _, qname := range qnames {
res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result() res, err := listDeadlineExceededCmd.Run(r.client,
[]string{base.DeadlinesKey(qname)},
deadline.Unix(), base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, s := range res { data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
for _, s := range data {
msg, err := base.DecodeMessage(s) msg, err := base.DecodeMessage(s)
if err != nil { if err != nil {
return nil, err return nil, err