mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Create Inspector
This commit is contained in:
parent
57838600ef
commit
9c2d2a612d
16
asynq.go
16
asynq.go
@ -1,6 +1,10 @@
|
||||
package asynq
|
||||
|
||||
import "github.com/google/uuid"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
/*
|
||||
TODOs:
|
||||
@ -57,3 +61,13 @@ type RedisConfig struct {
|
||||
// DB specifies which redis database to select.
|
||||
DB int
|
||||
}
|
||||
|
||||
// Stats represents a state of queues at a certain time.
|
||||
type Stats struct {
|
||||
Queued int
|
||||
InProgress int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
18
inspector.go
Normal file
18
inspector.go
Normal file
@ -0,0 +1,18 @@
|
||||
package asynq
|
||||
|
||||
// Inspector is used to inspect queues.
|
||||
type Inspector struct {
|
||||
rdb *rdb
|
||||
}
|
||||
|
||||
// NewInspector returns a new Inspector instance.
|
||||
func NewInspector(opt *RedisOpt) *Inspector {
|
||||
return &Inspector{
|
||||
rdb: newRDB(opt),
|
||||
}
|
||||
}
|
||||
|
||||
// CurrentStats returns a current stats of queues.
|
||||
func (i *Inspector) CurrentStats() (*Stats, error) {
|
||||
return i.rdb.currentStats()
|
||||
}
|
116
inspector_test.go
Normal file
116
inspector_test.go
Normal file
@ -0,0 +1,116 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
)
|
||||
|
||||
func TestCurrentStats(t *testing.T) {
|
||||
r := setup(t)
|
||||
inspector := &Inspector{r}
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("send_email", "default", nil)
|
||||
t3 := randomTask("gen_export", "default", nil)
|
||||
t4 := randomTask("gen_thumbnail", "default", nil)
|
||||
t5 := randomTask("send_email", "default", nil)
|
||||
|
||||
tests := []struct {
|
||||
queue []*taskMessage
|
||||
inProgress []*taskMessage
|
||||
scheduled []*taskMessage
|
||||
retry []*taskMessage
|
||||
dead []*taskMessage
|
||||
want *Stats
|
||||
}{
|
||||
{
|
||||
queue: []*taskMessage{t1},
|
||||
inProgress: []*taskMessage{t2, t3},
|
||||
scheduled: []*taskMessage{t4},
|
||||
retry: []*taskMessage{},
|
||||
dead: []*taskMessage{t5},
|
||||
want: &Stats{
|
||||
Queued: 1,
|
||||
InProgress: 2,
|
||||
Scheduled: 1,
|
||||
Retry: 0,
|
||||
Dead: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
queue: []*taskMessage{},
|
||||
inProgress: []*taskMessage{},
|
||||
scheduled: []*taskMessage{t1, t2, t4},
|
||||
retry: []*taskMessage{t3},
|
||||
dead: []*taskMessage{t5},
|
||||
want: &Stats{
|
||||
Queued: 0,
|
||||
InProgress: 0,
|
||||
Scheduled: 3,
|
||||
Retry: 1,
|
||||
Dead: 1,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, msg := range tc.queue {
|
||||
err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.inProgress {
|
||||
err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.scheduled {
|
||||
err := r.client.ZAdd(scheduled, &redis.Z{
|
||||
Member: mustMarshal(t, msg),
|
||||
Score: float64(time.Now().Add(time.Hour).Unix()),
|
||||
}).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.retry {
|
||||
err := r.client.ZAdd(retry, &redis.Z{
|
||||
Member: mustMarshal(t, msg),
|
||||
Score: float64(time.Now().Add(time.Hour).Unix()),
|
||||
}).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.dead {
|
||||
err := r.client.ZAdd(dead, &redis.Z{
|
||||
Member: mustMarshal(t, msg),
|
||||
Score: float64(time.Now().Unix()),
|
||||
}).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
got, err := inspector.CurrentStats()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
ignoreOpt := cmpopts.IgnoreFields(*tc.want, "Timestamp")
|
||||
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
|
||||
t.Errorf("(*Inspector).CurrentStats() = %+v, want %+v; (-want, +got)\n%s",
|
||||
got, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
21
rdb.go
21
rdb.go
@ -154,3 +154,24 @@ func (r *rdb) forward(from string) error {
|
||||
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *rdb) currentStats() (*Stats, error) {
|
||||
pipe := r.client.Pipeline()
|
||||
qlen := pipe.LLen(defaultQueue)
|
||||
plen := pipe.LLen(inProgress)
|
||||
slen := pipe.ZCard(scheduled)
|
||||
rlen := pipe.ZCard(retry)
|
||||
dlen := pipe.ZCard(dead)
|
||||
_, err := pipe.Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Stats{
|
||||
Queued: int(qlen.Val()),
|
||||
InProgress: int(plen.Val()),
|
||||
Scheduled: int(slen.Val()),
|
||||
Retry: int(rlen.Val()),
|
||||
Dead: int(dlen.Val()),
|
||||
Timestamp: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user