2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Update RDB.Dequeue

This commit is contained in:
Ken Hibino 2021-02-24 11:22:05 -08:00
parent ec9fd6b577
commit 7c75abe334
6 changed files with 89 additions and 42 deletions

3
.gitignore vendored
View File

@ -19,3 +19,6 @@
# Ignore asynq config file
.asynq.*
# Ignore editor config files
.vscode

View File

@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script)
## [0.16.0] - 2021-03-10
### Added

View File

@ -293,7 +293,7 @@ go get -u github.com/hibiken/asynq/tools/asynq
| Dependency | Version |
| -------------------------- | ------- |
| [Redis](https://redis.io/) | v3.0+ |
| [Redis](https://redis.io/) | v4.0+ |
| [Go](https://golang.org/) | v1.13+ |
## Contributing

View File

@ -284,7 +284,13 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b
if err := c.LPush(key, msg.ID.String()).Err(); err != nil {
tb.Fatal(err)
}
if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil {
key := base.TaskKey(msg.Queue, msg.ID.String())
data := map[string]interface{}{
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
}
if err := c.HSet(key, data).Err(); err != nil {
tb.Fatal(err)
}
}
@ -298,7 +304,13 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b
if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err)
}
if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil {
key := base.TaskKey(msg.Queue, msg.ID.String())
data := map[string]interface{}{
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
}
if err := c.HSet(key, data).Err(); err != nil {
tb.Fatal(err)
}
}
@ -310,7 +322,7 @@ func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []
ids := r.LRange(base.PendingKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
data := r.Get(base.TaskKey(qname, id)).Val()
data := r.HGet(base.TaskKey(qname, id), "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, data))
}
return msgs
@ -322,7 +334,7 @@ func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*
ids := r.LRange(base.ActiveKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
data := r.Get(base.TaskKey(qname, id)).Val()
data := r.HGet(base.TaskKey(qname, id), "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, data))
}
return msgs
@ -334,7 +346,7 @@ func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string)
ids := r.ZRange(base.ScheduledKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msg := r.HGet(base.TaskKey(qname, id), "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
@ -346,7 +358,7 @@ func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*b
ids := r.ZRange(base.RetryKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msg := r.HGet(base.TaskKey(qname, id), "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
@ -358,7 +370,7 @@ func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) [
ids := r.ZRange(base.ArchivedKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msg := r.HGet(base.TaskKey(qname, id), "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
@ -370,7 +382,7 @@ func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) [
zs := r.ZRangeWithScores(base.ScheduledKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val()
res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)})
}
return res
@ -382,7 +394,7 @@ func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []bas
zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),
@ -397,7 +409,7 @@ func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []
zs := r.ZRangeWithScores(base.ArchivedKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),
@ -412,7 +424,7 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [
zs := r.ZRangeWithScores(base.DeadlinesKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),

View File

@ -53,8 +53,10 @@ func (r *RDB) Ping() error {
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline)
var enqueueCmd = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
@ -75,6 +77,8 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
argv := []interface{}{
encoded,
msg.ID.String(),
msg.Timeout,
msg.Deadline,
}
return enqueueCmd.Run(r.client, keys, argv...).Err()
}
@ -85,12 +89,14 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
var enqueueUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("SET", KEYS[2], ARGV[3])
redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5])
redis.call("LPUSH", KEYS[3], ARGV[1])
return 1
`)
@ -114,6 +120,8 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
msg.ID.String(),
int(ttl.Seconds()),
encoded,
msg.Timeout,
msg.Deadline,
}
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
if err != nil {
@ -134,21 +142,22 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
// Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
data, d, err := r.dequeue(qnames...)
encoded, d, err := r.dequeue(qnames...)
if err != nil {
return nil, time.Time{}, err
}
if msg, err = base.DecodeMessage(data); err != nil {
if msg, err = base.DecodeMessage(encoded); err != nil {
return nil, time.Time{}, err
}
return msg, time.Unix(d, 0), nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:deadlines
// ARGV[1] -> current time in Unix time
// ARGV[2] -> task key prefix
//
// dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
@ -156,11 +165,13 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
// and inserts the task with deadlines set.
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if msg then
local decoded = cjson.decode(msg)
local timeout = decoded["Timeout"]
local deadline = decoded["Deadline"]
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
local data = redis.call("HMGET", key, "msg", "timeout", "deadline")
local msg = data[1]
local timeout = tonumber(data[2])
local deadline = tonumber(data[3])
local score
if timeout ~= 0 and deadline ~= 0 then
score = math.min(ARGV[1]+timeout, deadline)
@ -171,13 +182,13 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
else
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end
redis.call("ZADD", KEYS[4], score, msg)
redis.call("ZADD", KEYS[4], score, id)
return {msg, score}
end
end
return nil`)
func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) {
func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err error) {
for _, qname := range qnames {
keys := []string{
base.PendingKey(qname),
@ -185,7 +196,11 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
base.ActiveKey(qname),
base.DeadlinesKey(qname),
}
res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result()
argv := []interface{}{
time.Now().Unix(),
base.TaskKeyPrefix(qname),
}
res, err := dequeueCmd.Run(r.client, keys, argv...).Result()
if err == redis.Nil {
continue
} else if err != nil {
@ -198,13 +213,14 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
if len(data) != 2 {
return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data))
}
if msgjson, err = cast.ToStringE(data[0]); err != nil {
if encoded, err = cast.ToStringE(data[0]); err != nil {
return "", 0, err
}
if deadline, err = cast.ToInt64E(data[1]); err != nil {
return "", 0, err
}
return msgjson, deadline, nil
fmt.Printf("debug: Returning msgjson=%s deadline=%d\n", encoded, deadline)
return encoded, deadline, nil
}
return "", 0, ErrNoProcessableTask
}
@ -308,8 +324,10 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
// ARGV[1] -> task message data
// ARGV[2] -> process_at time in Unix time
// ARGV[3] -> task ID
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
var scheduleCmd = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1
`)
@ -331,6 +349,8 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
encoded,
processAt.Unix(),
msg.ID.String(),
msg.Timeout,
msg.Deadline,
}
return scheduleCmd.Run(r.client, keys, argv...).Err()
}
@ -342,12 +362,14 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message
// ARGV[5] -> task timeout in seconds (0 if not timeout)
// ARGV[6] -> task deadline in unix time (0 if no deadline)
var scheduleUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("SET", KEYS[2], ARGV[4])
redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6])
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
return 1
`)
@ -372,6 +394,8 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
int(ttl.Seconds()),
processAt.Unix(),
encoded,
msg.Timeout,
msg.Deadline,
}
res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result()
if err != nil {
@ -405,7 +429,7 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("SET", KEYS[1], ARGV[2])
redis.call("HSET", KEYS[1], "msg", ARGV[2])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[4])
@ -472,7 +496,7 @@ end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
redis.call("SET", KEYS[1], ARGV[2])
redis.call("HSET", KEYS[1], "msg", ARGV[2])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[6])
@ -571,7 +595,8 @@ var listDeadlineExceededCmd = redis.NewScript(`
local res = {}
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, id in ipairs(ids) do
table.insert(res, redis.call("GET", ARGV[2] .. id))
local key = ARGV[2] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)

View File

@ -158,6 +158,7 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "send_email",
Payload: map[string]interface{}{"subject": "hello!"},
Queue: "default",
Timeout: 1800,
Deadline: 0,
}
@ -166,6 +167,7 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "export_csv",
Payload: nil,
Queue: "critical",
Timeout: 0,
Deadline: 1593021600,
}
@ -174,10 +176,10 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "reindex",
Payload: nil,
Queue: "low",
Timeout: int64((5 * time.Minute).Seconds()),
Deadline: time.Now().Add(10 * time.Minute).Unix(),
}
t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest
tests := []struct {
pending map[string][]*base.TaskMessage
@ -253,26 +255,26 @@ func TestDequeue(t *testing.T) {
},
{
pending: map[string][]*base.TaskMessage{
"default": {t3},
"default": {t1},
"critical": {},
"low": {t2, t1},
"low": {t3},
},
args: []string{"critical", "default", "low"},
wantMsg: t3,
wantDeadline: time.Unix(t3Deadline, 0),
wantMsg: t1,
wantDeadline: time.Unix(t1Deadline, 0),
err: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {t2, t1},
"low": {t3},
},
wantActive: map[string][]*base.TaskMessage{
"default": {t3},
"default": {t1},
"critical": {},
"low": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t3, Score: t3Deadline}},
"default": {{Message: t1, Score: t1Deadline}},
"critical": {},
"low": {},
},