diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 5bfee07..afd2ceb 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -3,13 +3,13 @@ package rdb import ( "encoding/json" "fmt" - "strconv" "strings" "time" "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" + "github.com/spf13/cast" ) // Stats represents a state of queues at a certain time. @@ -77,35 +77,57 @@ type DeadTask struct { // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { + // KEYS[1] -> asynq:queues:default + // KEYS[2] -> asynq:in_progress + // KEYS[3] -> asynq:scheduled + // KEYS[4] -> asynq:retry + // KEYS[5] -> asynq:dead + // KEYS[6] -> asynq:processed: + // KEYS[7] -> asynq:failure: + script := redis.NewScript(` + local qlen = redis.call("LLEN", KEYS[1]) + local plen = redis.call("LLEN", KEYS[2]) + local slen = redis.call("ZCARD", KEYS[3]) + local rlen = redis.call("ZCARD", KEYS[4]) + local dlen = redis.call("ZCARD", KEYS[5]) + local pcount = 0 + local p = redis.call("GET", KEYS[6]) + if p then + pcount = tonumber(p) + end + local fcount = 0 + local f = redis.call("GET", KEYS[7]) + if f then + fcount = tonumber(f) + end + return {qlen, plen, slen, rlen, dlen, pcount, fcount} + `) + now := time.Now() - pipe := r.client.Pipeline() - qlen := pipe.LLen(base.DefaultQueue) - plen := pipe.LLen(base.InProgressQueue) - slen := pipe.ZCard(base.ScheduledQueue) - rlen := pipe.ZCard(base.RetryQueue) - dlen := pipe.ZCard(base.DeadQueue) - pcount := pipe.Get(base.ProcessedKey(now)) - fcount := pipe.Get(base.FailureKey(now)) - _, err := pipe.Exec() + res, err := script.Run(r.client, []string{ + base.DefaultQueue, + base.InProgressQueue, + base.ScheduledQueue, + base.RetryQueue, + base.DeadQueue, + base.ProcessedKey(now), + base.FailureKey(now), + }).Result() if err != nil { return nil, err } - p, err := strconv.Atoi(pcount.Val()) - if err != nil { - return nil, err - } - f, err := strconv.Atoi(fcount.Val()) + nums, err := cast.ToIntSliceE(res) if err != nil { return nil, err } return &Stats{ - Enqueued: int(qlen.Val()), - InProgress: int(plen.Val()), - Scheduled: int(slen.Val()), - Retry: int(rlen.Val()), - Dead: int(dlen.Val()), - Processed: p, - Failed: f, + Enqueued: nums[0], + InProgress: nums[1], + Scheduled: nums[2], + Retry: nums[3], + Dead: nums[4], + Processed: nums[5], + Failed: nums[6], Timestamp: now, }, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8584c25..9b8b888 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -138,6 +138,30 @@ func TestCurrentStats(t *testing.T) { } } +func TestCurrentStatsWithoutData(t *testing.T) { + r := setup(t) + + want := &Stats{ + Enqueued: 0, + InProgress: 0, + Scheduled: 0, + Retry: 0, + Dead: 0, + Processed: 0, + Failed: 0, + Timestamp: time.Now(), + } + + got, err := r.CurrentStats() + if err != nil { + t.Fatalf("r.CurrentStats() = %v, %v, want %+v, nil", got, err, want) + } + + if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { + t.Errorf("r.CurrentStats() = %v, %v, want %+v, nil; (-want, +got)\n%s", got, err, want, diff) + } +} + func TestRedisInfo(t *testing.T) { r := setup(t) diff --git a/tools/asynqmon/cmd/stats.go b/tools/asynqmon/cmd/stats.go index fd3424f..f3e4a1f 100644 --- a/tools/asynqmon/cmd/stats.go +++ b/tools/asynqmon/cmd/stats.go @@ -84,7 +84,13 @@ func printStats(s *rdb.Stats) { tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) fmt.Fprintf(tw, format, "Processed", "Failed", "Error Rate") fmt.Fprintf(tw, format, "---------", "------", "----------") - fmt.Fprintf(tw, format, s.Processed, s.Failed, fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)) + var errrate string + if s.Processed == 0 { + errrate = "N/A" + } else { + errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) + } + fmt.Fprintf(tw, format, s.Processed, s.Failed, errrate) tw.Flush() }