2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Update Dequeue command in rdb

This commit is contained in:
Ken Hibino
2020-08-08 06:04:16 -07:00
parent faeeb2c820
commit 62149e5a08
2 changed files with 79 additions and 54 deletions

View File

@@ -127,9 +127,6 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
return msg, time.Unix(d, 0), nil
}
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:paused
// KEYS[3] -> asynq:deadlines
// ARGV[1] -> current time in Unix time
// ARGV[2:] -> List of queues to query in order
//
@@ -140,8 +137,11 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
var dequeueCmd = redis.NewScript(`
for i = 2, table.getn(ARGV) do
local qkey = ARGV[i]
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
local msg = redis.call("RPOPLPUSH", qkey, KEYS[1])
local key_paused = qkey .. ":paused"
local key_inprogress = qkey .. ":in_progress"
local key_deadlines = qkey .. ":deadlines"
if redis.call("EXISTS", key_paused) == 0 then
local msg = redis.call("RPOPLPUSH", qkey, key_inprogress)
if msg then
local decoded = cjson.decode(msg)
local timeout = decoded["Timeout"]
@@ -156,7 +156,7 @@ for i = 2, table.getn(ARGV) do
else
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end
redis.call("ZADD", KEYS[3], score, msg)
redis.call("ZADD", key_deadlines, score, msg)
return {msg, score}
end
end
@@ -167,8 +167,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err
var args []interface{}
args = append(args, time.Now().Unix())
args = append(args, qkeys...)
res, err := dequeueCmd.Run(r.client,
[]string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result()
res, err := dequeueCmd.Run(r.client, nil, args...).Result()
if err != nil {
return "", 0, err
}