diff --git a/CHANGELOG.md b/CHANGELOG.md index bd949c7..8e10367 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive. +- `QueueInfo.MemoryUsage` is now an approximate usage value. + +### Fixed + +- Fixed latency issue around memory usage (see https://github.com/hibiken/asynq/issues/309). ## [0.18.1] - 2020-07-04 diff --git a/inspector.go b/inspector.go index 725c664..0375f51 100644 --- a/inspector.go +++ b/inspector.go @@ -50,6 +50,7 @@ type QueueInfo struct { Queue string // Total number of bytes that the queue and its tasks require to be stored in redis. + // It is an approximate memory usage value in bytes since the value is computed by sampling. MemoryUsage int64 // Size is the total number of tasks in the queue. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 7fcf3f1..ff02cb6 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -26,7 +26,8 @@ type Stats struct { // Name of the queue (e.g. "default", "critical"). Queue string // MemoryUsage is the total number of bytes the queue and its tasks require - // to be stored in redis. + // to be stored in redis. It is an approximate memory usage value in bytes + // since the value is computed by sampling. MemoryUsage int64 // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. @@ -172,31 +173,82 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { return stats, nil } +// Computes memory usage for the given queue by sampling tasks +// from each redis list/zset. Returns approximate memory usage value +// in bytes. +// +// KEYS[1] -> asynq:{qname}:active +// KEYS[2] -> asynq:{qname}:pending +// KEYS[3] -> asynq:{qname}:scheduled +// KEYS[4] -> asynq:{qname}:retry +// KEYS[5] -> asynq:{qname}:archived +// +// ARGV[1] -> asynq:{qname}:t: +// ARGV[2] -> sample_size (e.g 20) +var memoryUsageCmd = redis.NewScript(` +local sample_size = tonumber(ARGV[2]) +if sample_size <= 0 then + return redis.error_reply("sample size must be a positive number") +end +local memusg = 0 +for i=1,2 do + local ids = redis.call("LRANGE", KEYS[i], 0, sample_size - 1) + local sample_total = 0 + if (table.getn(ids) > 0) then + for _, id in ipairs(ids) do + local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id) + sample_total = sample_total + bytes + end + local n = redis.call("LLEN", KEYS[i]) + local avg = sample_total / table.getn(ids) + memusg = memusg + (avg * n) + end + local m = redis.call("MEMORY", "USAGE", KEYS[i]) + if (m) then + memusg = memusg + m + end +end +for i=3,5 do + local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1) + local sample_total = 0 + if (table.getn(ids) > 0) then + for _, id in ipairs(ids) do + local bytes = redis.call("MEMORY", "USAGE", ARGV[1] .. id) + sample_total = sample_total + bytes + end + local n = redis.call("ZCARD", KEYS[i]) + local avg = sample_total / table.getn(ids) + memusg = memusg + (avg * n) + end + local m = redis.call("MEMORY", "USAGE", KEYS[i]) + if (m) then + memusg = memusg + m + end +end +return memusg +`) + func (r *RDB) memoryUsage(qname string) (int64, error) { var op errors.Op = "rdb.memoryUsage" - var ( - keys []string - data []string - cursor uint64 - err error - ) - for { - data, cursor, err = r.client.Scan(cursor, fmt.Sprintf("asynq:{%s}*", qname), 100).Result() - if err != nil { - return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "scan", Err: err}) - } - keys = append(keys, data...) - if cursor == 0 { - break - } + const sampleSize = 20 + keys := []string{ + base.ActiveKey(qname), + base.PendingKey(qname), + base.ScheduledKey(qname), + base.RetryKey(qname), + base.ArchivedKey(qname), } - var usg int64 - for _, k := range keys { - n, err := r.client.MemoryUsage(k).Result() - if err != nil { - return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "memory usage", Err: err}) - } - usg += n + argv := []interface{}{ + base.TaskKeyPrefix(qname), + sampleSize, + } + res, err := memoryUsageCmd.Run(r.client, keys, argv...).Result() + if err != nil { + return 0, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + usg, err := cast.ToInt64E(res) + if err != nil { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("could not cast script return value to int64")) } return usg, nil }