Refactor redis keys and store messages in protobuf

Changes:
- Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout".
- Redis LIST and ZSET stores task message IDs
- Task messages are serialized using protocol buffer
This commit is contained in:
Ken Hibino
2021-03-12 16:23:08 -08:00
parent 30723edcaf
commit f0aefb0430
2 changed files with 31 additions and 2 deletions

View File

@@ -416,7 +416,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return res, nil
return zs, nil
}
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
@@ -634,7 +634,7 @@ func (r *RDB) runAll(zset, qname string) (int64, error) {
}
res, err := runAllCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, err
return err
}
n, ok := res.(int64)
if !ok {
@@ -1147,6 +1147,21 @@ end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
@@ -1195,6 +1210,9 @@ end
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])