diff --git a/CHANGELOG.md b/CHANGELOG.md index cb3dad8..99fdf3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `MemoryUsage` field is added to `QueueStats`. - `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector` - `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`. - asynq CLI now supports deleting/archiving pending tasks. diff --git a/inspector.go b/inspector.go index 04243aa..3d6ba67 100644 --- a/inspector.go +++ b/inspector.go @@ -41,6 +41,8 @@ func (i *Inspector) Queues() ([]string, error) { type QueueStats struct { // Name of the queue. Queue string + // Total number of bytes that the queue and its tasks require to be stored in redis. + MemoryUsage int64 // Size is the total number of tasks in the queue. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. Size int @@ -76,17 +78,18 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { return nil, err } return &QueueStats{ - Queue: stats.Queue, - Size: stats.Size, - Pending: stats.Pending, - Active: stats.Active, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Archived: stats.Archived, - Processed: stats.Processed, - Failed: stats.Failed, - Paused: stats.Paused, - Timestamp: stats.Timestamp, + Queue: stats.Queue, + MemoryUsage: stats.MemoryUsage, + Size: stats.Size, + Pending: stats.Pending, + Active: stats.Active, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Archived: stats.Archived, + Processed: stats.Processed, + Failed: stats.Failed, + Paused: stats.Paused, + Timestamp: stats.Timestamp, }, nil } diff --git a/inspector_test.go b/inspector_test.go index e3c7270..cd6267d 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -264,6 +264,7 @@ func TestInspectorCurrentStats(t *testing.T) { m6 := h.NewTaskMessageWithQueue("task6", nil, "low") now := time.Now() timeCmpOpt := cmpopts.EquateApproxTime(time.Second) + ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage") inspector := NewInspector(getRedisConnOpt(t)) @@ -356,7 +357,7 @@ func TestInspectorCurrentStats(t *testing.T) { tc.qname, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" { t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff) continue diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index deca68c..bd2e89c 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -25,6 +25,9 @@ func (r *RDB) AllQueues() ([]string, error) { type Stats struct { // Name of the queue (e.g. "default", "critical"). Queue string + // MemoryUsage is the total number of bytes the queue and its tasks require + // to be stored in redis. + MemoryUsage int64 // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool @@ -160,9 +163,30 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { } } stats.Size = size + memusg, err := r.memoryUsage(qname) + if err != nil { + return nil, err + } + stats.MemoryUsage = memusg return stats, nil } +func (r *RDB) memoryUsage(qname string) (int64, error) { + keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result() + if err != nil { + return 0, err + } + var usg int64 + for _, k := range keys { + n, err := r.client.MemoryUsage(k).Result() + if err != nil { + return 0, err + } + usg += n + } + return usg, nil +} + var historicalStatsCmd = redis.NewScript(` local res = {} for _, key in ipairs(KEYS) do diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index f4313d5..6283db8 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -209,7 +209,8 @@ func TestCurrentStats(t *testing.T) { continue } - if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { + ignoreMemUsg := cmpopts.IgnoreFields(Stats{}, "MemoryUsage") + if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" { t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff) continue }