2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Use approximate memory usage for QueueInfo

This commit is contained in:
Ken Hibino 2021-08-09 06:09:10 -07:00
parent 95c90a5cb8
commit e6355bf3f5
3 changed files with 81 additions and 23 deletions

View File

@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive. - Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
- `QueueInfo.MemoryUsage` is now an approximate usage value.
### Fixed
- Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309).
## [0.18.1] - 2020-07-04 ## [0.18.1] - 2020-07-04

View File

@ -50,6 +50,7 @@ type QueueInfo struct {
Queue string Queue string
// Total number of bytes that the queue and its tasks require to be stored in redis. // Total number of bytes that the queue and its tasks require to be stored in redis.
// It is an approximate memory usage value in bytes since the value is computed by sampling.
MemoryUsage int64 MemoryUsage int64
// Size is the total number of tasks in the queue. // Size is the total number of tasks in the queue.

View File

@ -26,7 +26,8 @@ type Stats struct {
// Name of the queue (e.g. "default", "critical"). // Name of the queue (e.g. "default", "critical").
Queue string Queue string
// MemoryUsage is the total number of bytes the queue and its tasks require // MemoryUsage is the total number of bytes the queue and its tasks require
// to be stored in redis. // to be stored in redis. It is an approximate memory usage value in bytes
// since the value is computed by sampling.
MemoryUsage int64 MemoryUsage int64
// Paused indicates whether the queue is paused. // Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed. // If true, tasks in the queue should not be processed.
@ -172,31 +173,82 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return stats, nil return stats, nil
} }
// Computes memory usage for the given queue by sampling tasks
// from each redis list/zset. Returns approximate memory usage value
// in bytes.
//
// KEYS[1] -> asynq:{qname}:active
// KEYS[2] -> asynq:{qname}:pending
// KEYS[3] -> asynq:{qname}:scheduled
// KEYS[4] -> asynq:{qname}:retry
// KEYS[5] -> asynq:{qname}:archived
//
// ARGV[1] -> asynq:{qname}:t:
// ARGV[2] -> sample_size (e.g 20)
var memoryUsageCmd = redis.NewScript(`
local sample_size = tonumber(ARGV[2])
if sample_size <= 0 then
return redis.error_reply("sample size must be a positive number")
end
local memusg = 0
for i=1,2 do
local ids = redis.call("LRANGE", KEYS[i], 0, sample_size - 1)
local sample_total = 0
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
end
local n = redis.call("LLEN", KEYS[i])
local avg = sample_total / table.getn(ids)
memusg = memusg + (avg * n)
end
local m = redis.call("MEMORY", "USAGE", KEYS[i])
if (m) then
memusg = memusg + m
end
end
for i=3,5 do
local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1)
local sample_total = 0
if (table.getn(ids) > 0) then
for _, id in ipairs(ids) do
local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id)
sample_total = sample_total + bytes
end
local n = redis.call("ZCARD", KEYS[i])
local avg = sample_total / table.getn(ids)
memusg = memusg + (avg * n)
end
local m = redis.call("MEMORY", "USAGE", KEYS[i])
if (m) then
memusg = memusg + m
end
end
return memusg
`)
func (r *RDB) memoryUsage(qname string) (int64, error) { func (r *RDB) memoryUsage(qname string) (int64, error) {
var op errors.Op = "rdb.memoryUsage" var op errors.Op = "rdb.memoryUsage"
var ( const sampleSize = 20
keys []string keys := []string{
data []string base.ActiveKey(qname),
cursor uint64 base.PendingKey(qname),
err error base.ScheduledKey(qname),
) base.RetryKey(qname),
for { base.ArchivedKey(qname),
data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result()
if err != nil {
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "scan", Err: err})
}
keys = append(keys, data...)
if cursor == 0 {
break
}
} }
var usg int64 argv := []interface{}{
for _, k := range keys { base.TaskKeyPrefix(qname),
n, err := r.client.MemoryUsage(k).Result() sampleSize,
if err != nil { }
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "memory usage", Err: err}) res, err := memoryUsageCmd.Run(r.client, keys, argv...).Result()
} if err != nil {
usg += n return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
usg, err := cast.ToInt64E(res)
if err != nil {
return 0, errors.E(op, errors.Internal, fmt.Sprintf("could not cast script return value to int64"))
} }
return usg, nil return usg, nil
} }