Update RDB.Enqueue* methods to be multi queue aware

This commit is contained in:
Ken Hibino
2020-01-08 07:16:21 -08:00
parent 8ff5c5101e
commit 718336ff44
2 changed files with 222 additions and 72 deletions

View File

@@ -405,13 +405,14 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("LPUSH", KEYS[2], msg)
local qkey = ARGV[3] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
return 1
end
end
return 0
`)
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}, score, id).Result()
res, err := script.Run(r.client, []string{zset}, score, id, base.QueuePrefix).Result()
if err != nil {
return 0, err
}
@@ -427,11 +428,13 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
redis.call("LPUSH", KEYS[2], msg)
local decoded = cjson.decode(msg)
local qkey = ARGV[1] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
end
return table.getn(msgs)
`)
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}).Result()
res, err := script.Run(r.client, []string{zset}, base.QueuePrefix).Result()
if err != nil {
return 0, err
}