diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index e3f6144..c7e1eb3 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -40,6 +40,7 @@ type Stats struct { Scheduled int Retry int Archived int + Completed int // Total number of tasks processed during the current date. // The number includes both succeeded and failed tasks. Processed int @@ -67,9 +68,10 @@ type DailyStats struct { // KEYS[3] -> asynq::scheduled // KEYS[4] -> asynq::retry // KEYS[5] -> asynq::archived -// KEYS[6] -> asynq::processed: -// KEYS[7] -> asynq::failed: -// KEYS[8] -> asynq::paused +// KEYS[6] -> asynq::completed +// KEYS[7] -> asynq::processed: +// KEYS[8] -> asynq::failed: +// KEYS[9] -> asynq::paused var currentStatsCmd = redis.NewScript(` local res = {} table.insert(res, KEYS[1]) @@ -82,22 +84,24 @@ table.insert(res, KEYS[4]) table.insert(res, redis.call("ZCARD", KEYS[4])) table.insert(res, KEYS[5]) table.insert(res, redis.call("ZCARD", KEYS[5])) +table.insert(res, KEYS[6]) +table.insert(res, redis.call("ZCARD", KEYS[6])) local pcount = 0 -local p = redis.call("GET", KEYS[6]) +local p = redis.call("GET", KEYS[7]) if p then pcount = tonumber(p) end -table.insert(res, KEYS[6]) +table.insert(res, KEYS[7]) table.insert(res, pcount) local fcount = 0 -local f = redis.call("GET", KEYS[7]) +local f = redis.call("GET", KEYS[8]) if f then fcount = tonumber(f) end -table.insert(res, KEYS[7]) -table.insert(res, fcount) table.insert(res, KEYS[8]) -table.insert(res, redis.call("EXISTS", KEYS[8])) +table.insert(res, fcount) +table.insert(res, KEYS[9]) +table.insert(res, redis.call("EXISTS", KEYS[9])) return res`) // CurrentStats returns a current state of the queues. @@ -117,6 +121,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { base.ScheduledKey(qname), base.RetryKey(qname), base.ArchivedKey(qname), + base.CompletedKey(qname), base.ProcessedKey(qname, now), base.FailedKey(qname, now), base.PausedKey(qname), @@ -152,6 +157,9 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { case base.ArchivedKey(qname): stats.Archived = val size += val + case base.CompletedKey(qname): + stats.Completed = val + size += val case base.ProcessedKey(qname, now): stats.Processed = val case base.FailedKey(qname, now): @@ -182,6 +190,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { // KEYS[3] -> asynq:{qname}:scheduled // KEYS[4] -> asynq:{qname}:retry // KEYS[5] -> asynq:{qname}:archived +// KEYS[6] -> asynq:{qname}:completed // // ARGV[1] -> asynq:{qname}:t: // ARGV[2] -> sample_size (e.g 20) @@ -208,7 +217,7 @@ for i=1,2 do memusg = memusg + m end end -for i=3,5 do +for i=3,6 do local ids = redis.call("ZRANGE", KEYS[i], 0, sample_size - 1) local sample_total = 0 if (table.getn(ids) > 0) then @@ -237,6 +246,7 @@ func (r *RDB) memoryUsage(qname string) (int64, error) { base.ScheduledKey(qname), base.RetryKey(qname), base.ArchivedKey(qname), + base.CompletedKey(qname), } argv := []interface{}{ base.TaskKeyPrefix(qname), diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 207709d..3a719b5 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -67,6 +67,7 @@ func TestCurrentStats(t *testing.T) { scheduled map[string][]base.Z retry map[string][]base.Z archived map[string][]base.Z + completed map[string][]base.Z processed map[string]int failed map[string]int paused []string @@ -102,6 +103,11 @@ func TestCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, + completed: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, processed: map[string]int{ "default": 120, "critical": 100, @@ -123,6 +129,7 @@ func TestCurrentStats(t *testing.T) { Scheduled: 2, Retry: 0, Archived: 0, + Completed: 0, Processed: 120, Failed: 2, Timestamp: now, @@ -157,6 +164,11 @@ func TestCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, + completed: map[string][]base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, processed: map[string]int{ "default": 120, "critical": 100, @@ -178,6 +190,7 @@ func TestCurrentStats(t *testing.T) { Scheduled: 0, Retry: 0, Archived: 0, + Completed: 0, Processed: 100, Failed: 0, Timestamp: now, @@ -197,6 +210,7 @@ func TestCurrentStats(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllArchivedQueues(t, r.client, tc.archived) + h.SeedAllCompletedQueues(t, r.client, tc.completed) for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) r.client.Set(context.Background(), processedKey, n, 0)