2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 09:56:12 +08:00

Show number of groups and aggregating task count in QueueInfo

This commit is contained in:
Ken Hibino
2022-03-15 06:52:56 -07:00
parent 6bdce095dd
commit 7fa02db7ff
4 changed files with 171 additions and 83 deletions

View File

@@ -34,13 +34,18 @@ type Stats struct {
Paused bool
// Size is the total number of tasks in the queue.
Size int
// Groups is the total number of groups in the queue.
Groups int
// Number of tasks in each state.
Pending int
Active int
Scheduled int
Retry int
Archived int
Completed int
Pending int
Active int
Scheduled int
Retry int
Archived int
Completed int
Aggregating int
// Number of tasks processed within the current date.
// The number includes both succeeded and failed tasks.
@@ -83,8 +88,10 @@ type DailyStats struct {
// KEYS[9] -> asynq:<qname>:processed
// KEYS[10] -> asynq:<qname>:failed
// KEYS[11] -> asynq:<qname>:paused
//
// KEYS[12] -> asynq:<qname>:groups
// --------
// ARGV[1] -> task key prefix
// ARGV[2] -> group key prefix
var currentStatsCmd = redis.NewScript(`
local res = {}
local pendingTaskCount = redis.call("LLEN", KEYS[1])
@@ -118,6 +125,15 @@ if pendingTaskCount > 0 then
else
table.insert(res, 0)
end
local group_names = redis.call("SMEMBERS", KEYS[12])
table.insert(res, "group_size")
table.insert(res, table.getn(group_names))
local aggregating_count = 0
for _, gname in ipairs(group_names) do
aggregating_count = aggregating_count + redis.call("ZCARD", ARGV[2] .. gname)
end
table.insert(res, "aggregating_count")
table.insert(res, aggregating_count)
return res`)
// CurrentStats returns a current state of the queues.
@@ -131,7 +147,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
now := r.clock.Now()
res, err := currentStatsCmd.Run(context.Background(), r.client, []string{
keys := []string{
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
@@ -143,7 +159,13 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.ProcessedTotalKey(qname),
base.FailedTotalKey(qname),
base.PausedKey(qname),
}, base.TaskKeyPrefix(qname)).Result()
base.AllGroups(qname),
}
argv := []interface{}{
base.TaskKeyPrefix(qname),
base.GroupKeyPrefix(qname),
}
res, err := currentStatsCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
@@ -198,6 +220,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
} else {
stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val)))
}
case "group_size":
stats.Groups = val
case "aggregating_count":
stats.Aggregating = val
}
}
stats.Size = size