2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

(rdb): Add Completed field to Stats struct

This commit is contained in:
Ken Hibino 2021-09-25 17:15:09 -07:00
parent 43c909624e
commit 0d5218329c
2 changed files with 34 additions and 10 deletions

View File

@ -40,6 +40,7 @@ type Stats struct {
Scheduled int
Retry int
Archived int
Completed int
// Total number of tasks processed during the current date.
// The number includes both succeeded and failed tasks.
Processed int
@ -67,9 +68,10 @@ type DailyStats struct {
// KEYS[3] -> asynq:<qname>:scheduled
// KEYS[4] -> asynq:<qname>:retry
// KEYS[5] -> asynq:<qname>:archived
// KEYS[6] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[7] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:paused
// KEYS[6] -> asynq:<qname>:completed
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:paused
var currentStatsCmd = redis.NewScript(`
local res = {}
table.insert(res, KEYS[1])
@ -82,22 +84,24 @@ table.insert(res, KEYS[4])
table.insert(res, redis.call("ZCARD", KEYS[4]))
table.insert(res, KEYS[5])
table.insert(res, redis.call("ZCARD", KEYS[5]))
table.insert(res, KEYS[6])
table.insert(res, redis.call("ZCARD", KEYS[6]))
local pcount = 0
local p = redis.call("GET", KEYS[6])
local p = redis.call("GET", KEYS[7])
if p then
pcount = tonumber(p)
end
table.insert(res, KEYS[6])
table.insert(res, KEYS[7])
table.insert(res, pcount)
local fcount = 0
local f = redis.call("GET", KEYS[7])
local f = redis.call("GET", KEYS[8])
if f then
fcount = tonumber(f)
end
table.insert(res, KEYS[7])
table.insert(res, fcount)
table.insert(res, KEYS[8])
table.insert(res, redis.call("EXISTS", KEYS[8]))
table.insert(res, fcount)
table.insert(res, KEYS[9])
table.insert(res, redis.call("EXISTS", KEYS[9]))
return res`)
// CurrentStats returns a current state of the queues.
@ -117,6 +121,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.ScheduledKey(qname),
base.RetryKey(qname),
base.ArchivedKey(qname),
base.CompletedKey(qname),
base.ProcessedKey(qname, now),
base.FailedKey(qname, now),
base.PausedKey(qname),
@ -152,6 +157,9 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
case base.ArchivedKey(qname):
stats.Archived = val
size += val
case base.CompletedKey(qname):
stats.Completed = val
size += val
case base.ProcessedKey(qname, now):
stats.Processed = val
case base.FailedKey(qname, now):
@ -182,6 +190,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
// KEYS[3] -> asynq:{qname}:scheduled
// KEYS[4] -> asynq:{qname}:retry
// KEYS[5] -> asynq:{qname}:archived
// KEYS[6] -> asynq:{qname}:completed
//
// ARGV[1] -> asynq:{qname}:t:
// ARGV[2] -> sample_size (e.g 20)
@ -208,7 +217,7 @@ for i=1,2 do
memusg = memusg + m
end
end
for i=3,5 do
for i=3,6 do
local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1)
local sample_total = 0
if (table.getn(ids) > 0) then
@ -237,6 +246,7 @@ func (r *RDB) memoryUsage(qname string) (int64, error) {
base.ScheduledKey(qname),
base.RetryKey(qname),
base.ArchivedKey(qname),
base.CompletedKey(qname),
}
argv := []interface{}{
base.TaskKeyPrefix(qname),

View File

@ -67,6 +67,7 @@ func TestCurrentStats(t *testing.T) {
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
processed map[string]int
failed map[string]int
paused []string
@ -102,6 +103,11 @@ func TestCurrentStats(t *testing.T) {
"critical": {},
"low": {},
},
completed: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
@ -123,6 +129,7 @@ func TestCurrentStats(t *testing.T) {
Scheduled: 2,
Retry: 0,
Archived: 0,
Completed: 0,
Processed: 120,
Failed: 2,
Timestamp: now,
@ -157,6 +164,11 @@ func TestCurrentStats(t *testing.T) {
"critical": {},
"low": {},
},
completed: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
@ -178,6 +190,7 @@ func TestCurrentStats(t *testing.T) {
Scheduled: 0,
Retry: 0,
Archived: 0,
Completed: 0,
Processed: 100,
Failed: 0,
Timestamp: now,
@ -197,6 +210,7 @@ func TestCurrentStats(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.client.Set(context.Background(), processedKey, n, 0)