2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Update CurrentStats method in RDB

This commit is contained in:
Ken Hibino 2020-08-11 06:28:12 -07:00
parent d25090c669
commit 24b13bd865
2 changed files with 163 additions and 166 deletions

View File

@ -6,8 +6,8 @@ package rdb
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sort"
"strings" "strings"
"time" "time"
@ -52,20 +52,20 @@ type DailyStats struct {
Time time.Time Time time.Time
} }
// KEYS[1] -> asynq:queues var ErrQueueNotFound = errors.New("rdb: queue does not exist")
// KEYS[2] -> asynq:in_progress
// KEYS[3] -> asynq:scheduled // KEYS[1] -> asynq:<qname>
// KEYS[4] -> asynq:retry // KEYS[2] -> asynq:<qname>:in_progress
// KEYS[5] -> asynq:dead // KEYS[3] -> asynq:<qname>:scheduled
// KEYS[6] -> asynq:processed:<yyyy-mm-dd> // KEYS[4] -> asynq:<qname>:retry
// KEYS[7] -> asynq:failure:<yyyy-mm-dd> // KEYS[5] -> asynq:<qname>:dead
// KEYS[6] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[7] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:paused
var currentStatsCmd = redis.NewScript(` var currentStatsCmd = redis.NewScript(`
local res = {} local res = {}
local queues = redis.call("SMEMBERS", KEYS[1]) table.insert(res, KEYS[1])
for _, qkey in ipairs(queues) do table.insert(res, redis.call("LLEN", KEYS[1]))
table.insert(res, qkey)
table.insert(res, redis.call("LLEN", qkey))
end
table.insert(res, KEYS[2]) table.insert(res, KEYS[2])
table.insert(res, redis.call("LLEN", KEYS[2])) table.insert(res, redis.call("LLEN", KEYS[2]))
table.insert(res, KEYS[3]) table.insert(res, KEYS[3])
@ -79,28 +79,38 @@ local p = redis.call("GET", KEYS[6])
if p then if p then
pcount = tonumber(p) pcount = tonumber(p)
end end
table.insert(res, "processed") table.insert(res, KEYS[6])
table.insert(res, pcount) table.insert(res, pcount)
local fcount = 0 local fcount = 0
local f = redis.call("GET", KEYS[7]) local f = redis.call("GET", KEYS[7])
if f then if f then
fcount = tonumber(f) fcount = tonumber(f)
end end
table.insert(res, "failed") table.insert(res, KEYS[7])
table.insert(res, fcount) table.insert(res, fcount)
table.insert(res, KEYS[8])
table.insert(res, redis.call("EXISTS", KEYS[8]))
return res`) return res`)
// CurrentStats returns a current state of the queues. // CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats() (*Stats, error) { func (r *RDB) CurrentStats(qname string) (*Stats, error) {
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
if err != nil {
return nil, err
}
if !exists {
return nil, ErrQueueNotFound
}
now := time.Now() now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{ res, err := currentStatsCmd.Run(r.client, []string{
base.AllQueues, base.QueueKey(qname),
base.InProgressQueue, base.InProgressKey(qname),
base.ScheduledQueue, base.ScheduledKey(qname),
base.RetryQueue, base.RetryKey(qname),
base.DeadQueue, base.DeadKey(qname),
base.ProcessedKey(now), base.ProcessedKey(qname, now),
base.FailureKey(now), base.FailedKey(qname, now),
base.PausedKey(qname),
}).Result() }).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -109,46 +119,36 @@ func (r *RDB) CurrentStats() (*Stats, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
paused, err := r.client.SMembersMap(base.PausedQueues).Result()
if err != nil {
return nil, err
}
stats := &Stats{ stats := &Stats{
Queues: make([]*Queue, 0), Name: qname,
Timestamp: now, Timestamp: now,
} }
for i := 0; i < len(data); i += 2 { for i := 0; i < len(data); i += 2 {
key := cast.ToString(data[i]) key := cast.ToString(data[i])
val := cast.ToInt(data[i+1]) val := cast.ToInt(data[i+1])
switch key {
switch { case base.QueueKey(qname):
case strings.HasPrefix(key, base.QueuePrefix): stats.Enqueued = val
stats.Enqueued += val case base.InProgressKey(qname):
q := Queue{
Name: strings.TrimPrefix(key, base.QueuePrefix),
Size: val,
}
if _, exist := paused[key]; exist {
q.Paused = true
}
stats.Queues = append(stats.Queues, &q)
case key == base.InProgressQueue:
stats.InProgress = val stats.InProgress = val
case key == base.ScheduledQueue: case base.ScheduledKey(qname):
stats.Scheduled = val stats.Scheduled = val
case key == base.RetryQueue: case ase.RetryKey(qname):
stats.Retry = val stats.Retry = val
case key == base.DeadQueue: case base.DeadKey(qname):
stats.Dead = val stats.Dead = val
case key == "processed": case base.ProcessedKey(qname, now):
stats.Processed = val stats.Processed = val
case key == "failed": case base.FailedKey(qname, now):
stats.Failed = val stats.Failed = val
case base.PausedKey(qname):
if val == 0 {
stats.Paused = false
} else {
stats.Paused = true
}
} }
} }
sort.Slice(stats.Queues, func(i, j int) bool {
return stats.Queues[i].Name < stats.Queues[j].Name
})
return stats, nil return stats, nil
} }

View File

@ -50,42 +50,67 @@ func TestCurrentStats(t *testing.T) {
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
m4 := h.NewTaskMessage("sync", nil) m4 := h.NewTaskMessage("sync", nil)
m5 := h.NewTaskMessage("important_notification", nil) m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m5.Queue = "critical" m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
m6 := h.NewTaskMessage("minor_notification", nil)
m6.Queue = "low"
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage enqueued map[string][]*base.TaskMessage
inProgress []*base.TaskMessage inProgress map[string][]*base.TaskMessage
scheduled []base.Z scheduled map[string][]base.Z
retry []base.Z retry map[string][]base.Z
dead []base.Z dead map[string][]base.Z
processed int processed map[string]int
failed int failed map[string]int
allQueues []interface{}
paused []string paused []string
qname string
want *Stats want *Stats
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1}, "default": {m1},
"critical": {m5}, "critical": {m5},
"low": {m6}, "low": {m6},
}, },
inProgress: []*base.TaskMessage{m2}, inProgress: map[string][]*base.TaskMessage{
scheduled: []base.Z{ "default": {m2},
{Message: m3, Score: now.Add(time.Hour).Unix()}, "critical": {},
{Message: m4, Score: now.Unix()}}, "low": {},
retry: []base.Z{}, },
dead: []base.Z{}, scheduled: map[string][]base.Z{
processed: 120, "default": {
failed: 2, {Message: m3, Score: now.Add(time.Hour).Unix()},
allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, {Message: m4, Score: now.Unix()},
paused: []string{}, },
"critical": {},
"low": {},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
dead: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
"low": 50,
},
failed: map[string]int{
"default": 2,
"critical": 0,
"low": 1,
},
paused: []string{},
qname: "default",
want: &Stats{ want: &Stats{
Enqueued: 3, Name: "default",
Paused: false,
Enqueued: 1,
InProgress: 1, InProgress: 1,
Scheduled: 2, Scheduled: 2,
Retry: 0, Retry: 0,
@ -93,74 +118,60 @@ func TestCurrentStats(t *testing.T) {
Processed: 120, Processed: 120,
Failed: 2, Failed: 2,
Timestamp: now, Timestamp: now,
// Queues should be sorted by name.
Queues: []*Queue{
{Name: "critical", Paused: false, Size: 1},
{Name: "default", Paused: false, Size: 1},
{Name: "low", Paused: false, Size: 1},
},
}, },
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {}, "default": {m1},
"critical": {m5},
"low": {m6},
}, },
inProgress: []*base.TaskMessage{}, inProgress: map[string][]*base.TaskMessage{
scheduled: []base.Z{ "default": {m2},
{Message: m3, Score: now.Unix()}, "critical": {},
{Message: m4, Score: now.Unix()}}, "low": {},
retry: []base.Z{ },
{Message: m1, Score: now.Add(time.Minute).Unix()}}, scheduled: map[string][]base.Z{
dead: []base.Z{ "default": {
{Message: m2, Score: now.Add(-time.Hour).Unix()}}, {Message: m3, Score: now.Add(time.Hour).Unix()},
processed: 90, {Message: m4, Score: now.Unix()},
failed: 10, },
allQueues: []interface{}{base.DefaultQueue}, "critical": {},
paused: []string{}, "low": {},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
dead: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
"low": 50,
},
failed: map[string]int{
"default": 2,
"critical": 0,
"low": 1,
},
paused: []string{"critical", "low"},
qname: "critical",
want: &Stats{ want: &Stats{
Enqueued: 0, Name: "critical",
Paused: true,
Enqueued: 1,
InProgress: 0, InProgress: 0,
Scheduled: 2, Scheduled: 0,
Retry: 1,
Dead: 1,
Processed: 90,
Failed: 10,
Timestamp: now,
Queues: []*Queue{
{Name: base.DefaultQueueName, Paused: false, Size: 0},
},
},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1},
"critical": {m5},
"low": {m6},
},
inProgress: []*base.TaskMessage{m2},
scheduled: []base.Z{
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()}},
retry: []base.Z{},
dead: []base.Z{},
processed: 120,
failed: 2,
allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")},
paused: []string{"critical", "low"},
want: &Stats{
Enqueued: 3,
InProgress: 1,
Scheduled: 2,
Retry: 0, Retry: 0,
Dead: 0, Dead: 0,
Processed: 120, Processed: 100,
Failed: 2, Failed: 0,
Timestamp: now, Timestamp: now,
Queues: []*Queue{
{Name: "critical", Paused: true, Size: 1},
{Name: "default", Paused: false, Size: 1},
{Name: "low", Paused: true, Size: 1},
},
}, },
}, },
} }
@ -172,54 +183,40 @@ func TestCurrentStats(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
for qname, msgs := range tc.enqueued { h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
h.SeedEnqueuedQueue(t, r.client, msgs, qname) h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.client.Set(processedKey, n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(now)
r.client.Set(failedKey, n, 0)
} }
h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedScheduledQueue(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry)
h.SeedDeadQueue(t, r.client, tc.dead)
processedKey := base.ProcessedKey(now)
failedKey := base.FailureKey(now)
r.client.Set(processedKey, tc.processed, 0)
r.client.Set(failedKey, tc.failed, 0)
r.client.SAdd(base.AllQueues, tc.allQueues...)
got, err := r.CurrentStats() got, err := r.CurrentStats(tc.qname)
if err != nil { if err != nil {
t.Errorf("r.CurrentStats() = %v, %v, want %v, nil", got, err, tc.want) t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil", tc.qname, got, err, tc.want)
continue continue
} }
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
t.Errorf("r.CurrentStats() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff) t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
continue continue
} }
} }
} }
func TestCurrentStatsWithoutData(t *testing.T) { func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
r := setup(t) r := setup(t)
want := &Stats{ qname := "non-existent"
Enqueued: 0, got, err := r.CurrentStats(qname)
InProgress: 0, if err != ErrQueueNotFound {
Scheduled: 0, t.Fatalf("r.CurrentStats(%q) = %v, %v, want nil, %v", qname, got, err, ErrQueueNotFound)
Retry: 0,
Dead: 0,
Processed: 0,
Failed: 0,
Timestamp: time.Now(),
Queues: make([]*Queue, 0),
}
got, err := r.CurrentStats()
if err != nil {
t.Fatalf("r.CurrentStats() = %v, %v, want %+v, nil", got, err, want)
}
if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" {
t.Errorf("r.CurrentStats() = %v, %v, want %+v, nil; (-want, +got)\n%s", got, err, want, diff)
} }
} }