From 9c2d2a612dbde6f14daef4270600fe569a42396d Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 1 Dec 2019 07:59:52 -0800 Subject: [PATCH] Create Inspector --- asynq.go | 16 ++++++- inspector.go | 18 +++++++ inspector_test.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++ rdb.go | 21 +++++++++ 4 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 inspector.go create mode 100644 inspector_test.go diff --git a/asynq.go b/asynq.go index fd6d27a..3d09463 100644 --- a/asynq.go +++ b/asynq.go @@ -1,6 +1,10 @@ package asynq -import "github.com/google/uuid" +import ( + "time" + + "github.com/google/uuid" +) /* TODOs: @@ -57,3 +61,13 @@ type RedisConfig struct { // DB specifies which redis database to select. DB int } + +// Stats represents a state of queues at a certain time. +type Stats struct { + Queued int + InProgress int + Scheduled int + Retry int + Dead int + Timestamp time.Time +} diff --git a/inspector.go b/inspector.go new file mode 100644 index 0000000..813661e --- /dev/null +++ b/inspector.go @@ -0,0 +1,18 @@ +package asynq + +// Inspector is used to inspect queues. +type Inspector struct { + rdb *rdb +} + +// NewInspector returns a new Inspector instance. +func NewInspector(opt *RedisOpt) *Inspector { + return &Inspector{ + rdb: newRDB(opt), + } +} + +// CurrentStats returns a current stats of queues. +func (i *Inspector) CurrentStats() (*Stats, error) { + return i.rdb.currentStats() +} diff --git a/inspector_test.go b/inspector_test.go new file mode 100644 index 0000000..bd0e544 --- /dev/null +++ b/inspector_test.go @@ -0,0 +1,116 @@ +package asynq + +import ( + "testing" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestCurrentStats(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("send_email", "default", nil) + t3 := randomTask("gen_export", "default", nil) + t4 := randomTask("gen_thumbnail", "default", nil) + t5 := randomTask("send_email", "default", nil) + + tests := []struct { + queue []*taskMessage + inProgress []*taskMessage + scheduled []*taskMessage + retry []*taskMessage + dead []*taskMessage + want *Stats + }{ + { + queue: []*taskMessage{t1}, + inProgress: []*taskMessage{t2, t3}, + scheduled: []*taskMessage{t4}, + retry: []*taskMessage{}, + dead: []*taskMessage{t5}, + want: &Stats{ + Queued: 1, + InProgress: 2, + Scheduled: 1, + Retry: 0, + Dead: 1, + }, + }, + { + queue: []*taskMessage{}, + inProgress: []*taskMessage{}, + scheduled: []*taskMessage{t1, t2, t4}, + retry: []*taskMessage{t3}, + dead: []*taskMessage{t5}, + want: &Stats{ + Queued: 0, + InProgress: 0, + Scheduled: 3, + Retry: 1, + Dead: 1, + }, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.queue { + err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err() + if err != nil { + t.Fatal(err) + } + } + for _, msg := range tc.inProgress { + err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err() + if err != nil { + t.Fatal(err) + } + } + for _, msg := range tc.scheduled { + err := r.client.ZAdd(scheduled, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + for _, msg := range tc.retry { + err := r.client.ZAdd(retry, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + for _, msg := range tc.dead { + err := r.client.ZAdd(dead, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.CurrentStats() + if err != nil { + t.Error(err) + continue + } + ignoreOpt := cmpopts.IgnoreFields(*tc.want, "Timestamp") + if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { + t.Errorf("(*Inspector).CurrentStats() = %+v, want %+v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} diff --git a/rdb.go b/rdb.go index 03ae997..a6b4e6b 100644 --- a/rdb.go +++ b/rdb.go @@ -154,3 +154,24 @@ func (r *rdb) forward(from string) error { fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from) return err } + +func (r *rdb) currentStats() (*Stats, error) { + pipe := r.client.Pipeline() + qlen := pipe.LLen(defaultQueue) + plen := pipe.LLen(inProgress) + slen := pipe.ZCard(scheduled) + rlen := pipe.ZCard(retry) + dlen := pipe.ZCard(dead) + _, err := pipe.Exec() + if err != nil { + return nil, err + } + return &Stats{ + Queued: int(qlen.Val()), + InProgress: int(plen.Val()), + Scheduled: int(slen.Val()), + Retry: int(rlen.Val()), + Dead: int(dlen.Val()), + Timestamp: time.Now(), + }, nil +}