mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add MemoryUsage field to QueueStats
This commit is contained in:
parent
6529a1e0b1
commit
afde6a7266
@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
- `MemoryUsage` field is added to `QueueStats`.
|
||||||
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
|
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
|
||||||
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
|
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
|
||||||
- asynq CLI now supports deleting/archiving pending tasks.
|
- asynq CLI now supports deleting/archiving pending tasks.
|
||||||
|
@ -41,6 +41,8 @@ func (i *Inspector) Queues() ([]string, error) {
|
|||||||
type QueueStats struct {
|
type QueueStats struct {
|
||||||
// Name of the queue.
|
// Name of the queue.
|
||||||
Queue string
|
Queue string
|
||||||
|
// Total number of bytes that the queue and its tasks require to be stored in redis.
|
||||||
|
MemoryUsage int64
|
||||||
// Size is the total number of tasks in the queue.
|
// Size is the total number of tasks in the queue.
|
||||||
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
|
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
|
||||||
Size int
|
Size int
|
||||||
@ -77,6 +79,7 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
|
|||||||
}
|
}
|
||||||
return &QueueStats{
|
return &QueueStats{
|
||||||
Queue: stats.Queue,
|
Queue: stats.Queue,
|
||||||
|
MemoryUsage: stats.MemoryUsage,
|
||||||
Size: stats.Size,
|
Size: stats.Size,
|
||||||
Pending: stats.Pending,
|
Pending: stats.Pending,
|
||||||
Active: stats.Active,
|
Active: stats.Active,
|
||||||
|
@ -264,6 +264,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
|||||||
m6 := h.NewTaskMessageWithQueue("task6", nil, "low")
|
m6 := h.NewTaskMessageWithQueue("task6", nil, "low")
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
|
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
|
||||||
|
ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage")
|
||||||
|
|
||||||
inspector := NewInspector(getRedisConnOpt(t))
|
inspector := NewInspector(getRedisConnOpt(t))
|
||||||
|
|
||||||
@ -356,7 +357,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
|||||||
tc.qname, got, err, tc.want)
|
tc.qname, got, err, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
|
if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" {
|
||||||
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s",
|
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s",
|
||||||
tc.qname, got, err, tc.want, diff)
|
tc.qname, got, err, tc.want, diff)
|
||||||
continue
|
continue
|
||||||
|
@ -25,6 +25,9 @@ func (r *RDB) AllQueues() ([]string, error) {
|
|||||||
type Stats struct {
|
type Stats struct {
|
||||||
// Name of the queue (e.g. "default", "critical").
|
// Name of the queue (e.g. "default", "critical").
|
||||||
Queue string
|
Queue string
|
||||||
|
// MemoryUsage is the total number of bytes the queue and its tasks require
|
||||||
|
// to be stored in redis.
|
||||||
|
MemoryUsage int64
|
||||||
// Paused indicates whether the queue is paused.
|
// Paused indicates whether the queue is paused.
|
||||||
// If true, tasks in the queue should not be processed.
|
// If true, tasks in the queue should not be processed.
|
||||||
Paused bool
|
Paused bool
|
||||||
@ -160,9 +163,30 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
stats.Size = size
|
stats.Size = size
|
||||||
|
memusg, err := r.memoryUsage(qname)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
stats.MemoryUsage = memusg
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RDB) memoryUsage(qname string) (int64, error) {
|
||||||
|
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
var usg int64
|
||||||
|
for _, k := range keys {
|
||||||
|
n, err := r.client.MemoryUsage(k).Result()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
usg += n
|
||||||
|
}
|
||||||
|
return usg, nil
|
||||||
|
}
|
||||||
|
|
||||||
var historicalStatsCmd = redis.NewScript(`
|
var historicalStatsCmd = redis.NewScript(`
|
||||||
local res = {}
|
local res = {}
|
||||||
for _, key in ipairs(KEYS) do
|
for _, key in ipairs(KEYS) do
|
||||||
|
@ -209,7 +209,8 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
|
ignoreMemUsg := cmpopts.IgnoreFields(Stats{}, "MemoryUsage")
|
||||||
|
if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" {
|
||||||
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
|
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user