mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Update memory usage redis lua script to account for groups
This commit is contained in:
parent
efe3c74037
commit
652939dd3a
@ -245,9 +245,12 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
|||||||
// KEYS[4] -> asynq:{qname}:retry
|
// KEYS[4] -> asynq:{qname}:retry
|
||||||
// KEYS[5] -> asynq:{qname}:archived
|
// KEYS[5] -> asynq:{qname}:archived
|
||||||
// KEYS[6] -> asynq:{qname}:completed
|
// KEYS[6] -> asynq:{qname}:completed
|
||||||
//
|
// KEYS[7] -> asynq:{qname}:groups
|
||||||
// ARGV[1] -> asynq:{qname}:t:
|
// -------
|
||||||
// ARGV[2] -> sample_size (e.g 20)
|
// ARGV[1] -> asynq:{qname}:t: (task key prefix)
|
||||||
|
// ARGV[2] -> task sample size per redis list/zset (e.g 20)
|
||||||
|
// ARGV[3] -> group sample size
|
||||||
|
// ARGV[4] -> asynq:{qname}:g: (group key prefix)
|
||||||
var memoryUsageCmd = redis.NewScript(`
|
var memoryUsageCmd = redis.NewScript(`
|
||||||
local sample_size = tonumber(ARGV[2])
|
local sample_size = tonumber(ARGV[2])
|
||||||
if sample_size <= 0 then
|
if sample_size <= 0 then
|
||||||
@ -288,12 +291,39 @@ for i=3,6 do
|
|||||||
memusg = memusg + m
|
memusg = memusg + m
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
local group_names = redis.call("SRANDMEMBER", KEYS[7], tonumber(ARGV[3]))
|
||||||
|
local group_sample_total = 0
|
||||||
|
for _, gname in ipairs(group_names) do
|
||||||
|
local group_key = ARGV[4] .. gname
|
||||||
|
local ids = redis.call("ZRANGE", group_key, 0, sample_size - 1)
|
||||||
|
local sample_total = 0
|
||||||
|
if (table.getn(ids) > 0) then
|
||||||
|
for _, id in ipairs(ids) do
|
||||||
|
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
|
||||||
|
sample_total = sample_total + bytes
|
||||||
|
end
|
||||||
|
local n = redis.call("ZCARD", group_key)
|
||||||
|
local avg = sample_total / table.getn(ids)
|
||||||
|
group_sample_total = group_sample_total + (avg * n)
|
||||||
|
end
|
||||||
|
local m = redis.call("MEMORY", "USAGE", group_key)
|
||||||
|
if (m) then
|
||||||
|
group_sample_total = group_sample_total + m
|
||||||
|
end
|
||||||
|
end
|
||||||
|
local group_size = redis.call("SCARD", KEYS[7])
|
||||||
|
local group_memusg_avg = group_sample_total / table.getn(group_names)
|
||||||
|
memusg = memusg + (group_memusg_avg * group_size)
|
||||||
return memusg
|
return memusg
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
||||||
var op errors.Op = "rdb.memoryUsage"
|
var op errors.Op = "rdb.memoryUsage"
|
||||||
const sampleSize = 20
|
const (
|
||||||
|
taskSampleSize = 20
|
||||||
|
groupSampleSize = 5
|
||||||
|
)
|
||||||
|
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.ActiveKey(qname),
|
base.ActiveKey(qname),
|
||||||
base.PendingKey(qname),
|
base.PendingKey(qname),
|
||||||
@ -301,10 +331,13 @@ func (r *RDB) memoryUsage(qname string) (int64, error) {
|
|||||||
base.RetryKey(qname),
|
base.RetryKey(qname),
|
||||||
base.ArchivedKey(qname),
|
base.ArchivedKey(qname),
|
||||||
base.CompletedKey(qname),
|
base.CompletedKey(qname),
|
||||||
|
base.AllGroups(qname),
|
||||||
}
|
}
|
||||||
argv := []interface{}{
|
argv := []interface{}{
|
||||||
base.TaskKeyPrefix(qname),
|
base.TaskKeyPrefix(qname),
|
||||||
sampleSize,
|
taskSampleSize,
|
||||||
|
groupSampleSize,
|
||||||
|
base.GroupKeyPrefix(qname),
|
||||||
}
|
}
|
||||||
res, err := memoryUsageCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
res, err := memoryUsageCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user