mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Change RDB.CurrentStats to be multi-queue aware
This commit is contained in:
parent
67f381269a
commit
89843ac565
@ -25,6 +25,7 @@ type Stats struct {
|
|||||||
Dead int
|
Dead int
|
||||||
Processed int
|
Processed int
|
||||||
Failed int
|
Failed int
|
||||||
|
Queues map[string]int // map of queue name to number of tasks in the queue (e.g., "default": 100, "critical": 20)
|
||||||
Timestamp time.Time
|
Timestamp time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,7 +84,7 @@ type DeadTask struct {
|
|||||||
|
|
||||||
// CurrentStats returns a current state of the queues.
|
// CurrentStats returns a current state of the queues.
|
||||||
func (r *RDB) CurrentStats() (*Stats, error) {
|
func (r *RDB) CurrentStats() (*Stats, error) {
|
||||||
// KEYS[1] -> asynq:queues:default
|
// KEYS[1] -> asynq:queues
|
||||||
// KEYS[2] -> asynq:in_progress
|
// KEYS[2] -> asynq:in_progress
|
||||||
// KEYS[3] -> asynq:scheduled
|
// KEYS[3] -> asynq:scheduled
|
||||||
// KEYS[4] -> asynq:retry
|
// KEYS[4] -> asynq:retry
|
||||||
@ -91,27 +92,40 @@ func (r *RDB) CurrentStats() (*Stats, error) {
|
|||||||
// KEYS[6] -> asynq:processed:<yyyy-mm-dd>
|
// KEYS[6] -> asynq:processed:<yyyy-mm-dd>
|
||||||
// KEYS[7] -> asynq:failure:<yyyy-mm-dd>
|
// KEYS[7] -> asynq:failure:<yyyy-mm-dd>
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local qlen = redis.call("LLEN", KEYS[1])
|
local res = {}
|
||||||
local plen = redis.call("LLEN", KEYS[2])
|
local queues = redis.call("SMEMBERS", KEYS[1])
|
||||||
local slen = redis.call("ZCARD", KEYS[3])
|
for _, qkey in ipairs(queues) do
|
||||||
local rlen = redis.call("ZCARD", KEYS[4])
|
table.insert(res, qkey)
|
||||||
local dlen = redis.call("ZCARD", KEYS[5])
|
table.insert(res, redis.call("LLEN", qkey))
|
||||||
|
end
|
||||||
|
table.insert(res, KEYS[2])
|
||||||
|
table.insert(res, redis.call("LLEN", KEYS[2]))
|
||||||
|
table.insert(res, KEYS[3])
|
||||||
|
table.insert(res, redis.call("ZCARD", KEYS[3]))
|
||||||
|
table.insert(res, KEYS[4])
|
||||||
|
table.insert(res, redis.call("ZCARD", KEYS[4]))
|
||||||
|
table.insert(res, KEYS[5])
|
||||||
|
table.insert(res, redis.call("ZCARD", KEYS[5]))
|
||||||
local pcount = 0
|
local pcount = 0
|
||||||
local p = redis.call("GET", KEYS[6])
|
local p = redis.call("GET", KEYS[6])
|
||||||
if p then
|
if p then
|
||||||
pcount = tonumber(p)
|
pcount = tonumber(p)
|
||||||
end
|
end
|
||||||
|
table.insert(res, "processed")
|
||||||
|
table.insert(res, pcount)
|
||||||
local fcount = 0
|
local fcount = 0
|
||||||
local f = redis.call("GET", KEYS[7])
|
local f = redis.call("GET", KEYS[7])
|
||||||
if f then
|
if f then
|
||||||
fcount = tonumber(f)
|
fcount = tonumber(f)
|
||||||
end
|
end
|
||||||
return {qlen, plen, slen, rlen, dlen, pcount, fcount}
|
table.insert(res, "failed")
|
||||||
|
table.insert(res, fcount)
|
||||||
|
return res
|
||||||
`)
|
`)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
res, err := script.Run(r.client, []string{
|
res, err := script.Run(r.client, []string{
|
||||||
base.DefaultQueue,
|
base.AllQueues,
|
||||||
base.InProgressQueue,
|
base.InProgressQueue,
|
||||||
base.ScheduledQueue,
|
base.ScheduledQueue,
|
||||||
base.RetryQueue,
|
base.RetryQueue,
|
||||||
@ -122,20 +136,37 @@ func (r *RDB) CurrentStats() (*Stats, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
nums, err := cast.ToIntSliceE(res)
|
data, err := cast.ToSliceE(res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Stats{
|
stats := &Stats{
|
||||||
Enqueued: nums[0],
|
Queues: make(map[string]int),
|
||||||
InProgress: nums[1],
|
Timestamp: now,
|
||||||
Scheduled: nums[2],
|
}
|
||||||
Retry: nums[3],
|
for i := 0; i < len(data); i += 2 {
|
||||||
Dead: nums[4],
|
key := cast.ToString(data[i])
|
||||||
Processed: nums[5],
|
val := cast.ToInt(data[i+1])
|
||||||
Failed: nums[6],
|
|
||||||
Timestamp: now,
|
switch {
|
||||||
}, nil
|
case strings.HasPrefix(key, base.QueuePrefix):
|
||||||
|
stats.Enqueued += val
|
||||||
|
stats.Queues[strings.TrimPrefix(key, base.QueuePrefix)] = val
|
||||||
|
case key == base.InProgressQueue:
|
||||||
|
stats.InProgress = val
|
||||||
|
case key == base.ScheduledQueue:
|
||||||
|
stats.Scheduled = val
|
||||||
|
case key == base.RetryQueue:
|
||||||
|
stats.Retry = val
|
||||||
|
case key == base.DeadQueue:
|
||||||
|
stats.Dead = val
|
||||||
|
case key == "processed":
|
||||||
|
stats.Processed = val
|
||||||
|
case key == "failed":
|
||||||
|
stats.Failed = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// HistoricalStats returns a list of stats from the last n days.
|
// HistoricalStats returns a list of stats from the last n days.
|
||||||
|
@ -22,20 +22,29 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
m2 := h.NewTaskMessage("reindex", nil)
|
m2 := h.NewTaskMessage("reindex", nil)
|
||||||
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
|
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
|
||||||
m4 := h.NewTaskMessage("sync", nil)
|
m4 := h.NewTaskMessage("sync", nil)
|
||||||
|
m5 := h.NewTaskMessage("important_notification", nil)
|
||||||
|
m5.Queue = "critical"
|
||||||
|
m6 := h.NewTaskMessage("minor_notification", nil)
|
||||||
|
m6.Queue = "low"
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued []*base.TaskMessage
|
enqueued map[string][]*base.TaskMessage
|
||||||
inProgress []*base.TaskMessage
|
inProgress []*base.TaskMessage
|
||||||
scheduled []h.ZSetEntry
|
scheduled []h.ZSetEntry
|
||||||
retry []h.ZSetEntry
|
retry []h.ZSetEntry
|
||||||
dead []h.ZSetEntry
|
dead []h.ZSetEntry
|
||||||
processed int
|
processed int
|
||||||
failed int
|
failed int
|
||||||
|
allQueues []interface{}
|
||||||
want *Stats
|
want *Stats
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
enqueued: []*base.TaskMessage{m1},
|
enqueued: map[string][]*base.TaskMessage{
|
||||||
|
base.DefaultQueueName: {m1},
|
||||||
|
"critical": {m5},
|
||||||
|
"low": {m6},
|
||||||
|
},
|
||||||
inProgress: []*base.TaskMessage{m2},
|
inProgress: []*base.TaskMessage{m2},
|
||||||
scheduled: []h.ZSetEntry{
|
scheduled: []h.ZSetEntry{
|
||||||
{Msg: m3, Score: now.Add(time.Hour).Unix()},
|
{Msg: m3, Score: now.Add(time.Hour).Unix()},
|
||||||
@ -44,8 +53,9 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
dead: []h.ZSetEntry{},
|
dead: []h.ZSetEntry{},
|
||||||
processed: 120,
|
processed: 120,
|
||||||
failed: 2,
|
failed: 2,
|
||||||
|
allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")},
|
||||||
want: &Stats{
|
want: &Stats{
|
||||||
Enqueued: 1,
|
Enqueued: 3,
|
||||||
InProgress: 1,
|
InProgress: 1,
|
||||||
Scheduled: 2,
|
Scheduled: 2,
|
||||||
Retry: 0,
|
Retry: 0,
|
||||||
@ -53,10 +63,13 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
Processed: 120,
|
Processed: 120,
|
||||||
Failed: 2,
|
Failed: 2,
|
||||||
Timestamp: now,
|
Timestamp: now,
|
||||||
|
Queues: map[string]int{base.DefaultQueueName: 1, "critical": 1, "low": 1},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
enqueued: []*base.TaskMessage{},
|
enqueued: map[string][]*base.TaskMessage{
|
||||||
|
base.DefaultQueueName: {},
|
||||||
|
},
|
||||||
inProgress: []*base.TaskMessage{},
|
inProgress: []*base.TaskMessage{},
|
||||||
scheduled: []h.ZSetEntry{
|
scheduled: []h.ZSetEntry{
|
||||||
{Msg: m3, Score: now.Unix()},
|
{Msg: m3, Score: now.Unix()},
|
||||||
@ -67,6 +80,7 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
{Msg: m2, Score: now.Add(-time.Hour).Unix()}},
|
{Msg: m2, Score: now.Add(-time.Hour).Unix()}},
|
||||||
processed: 90,
|
processed: 90,
|
||||||
failed: 10,
|
failed: 10,
|
||||||
|
allQueues: []interface{}{base.DefaultQueue},
|
||||||
want: &Stats{
|
want: &Stats{
|
||||||
Enqueued: 0,
|
Enqueued: 0,
|
||||||
InProgress: 0,
|
InProgress: 0,
|
||||||
@ -76,13 +90,16 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
Processed: 90,
|
Processed: 90,
|
||||||
Failed: 10,
|
Failed: 10,
|
||||||
Timestamp: now,
|
Timestamp: now,
|
||||||
|
Queues: map[string]int{base.DefaultQueueName: 0},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
|
for qname, msgs := range tc.enqueued {
|
||||||
|
h.SeedEnqueuedQueue(t, r.client, msgs, qname)
|
||||||
|
}
|
||||||
h.SeedInProgressQueue(t, r.client, tc.inProgress)
|
h.SeedInProgressQueue(t, r.client, tc.inProgress)
|
||||||
h.SeedScheduledQueue(t, r.client, tc.scheduled)
|
h.SeedScheduledQueue(t, r.client, tc.scheduled)
|
||||||
h.SeedRetryQueue(t, r.client, tc.retry)
|
h.SeedRetryQueue(t, r.client, tc.retry)
|
||||||
@ -91,6 +108,7 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
failedKey := base.FailureKey(now)
|
failedKey := base.FailureKey(now)
|
||||||
r.client.Set(processedKey, tc.processed, 0)
|
r.client.Set(processedKey, tc.processed, 0)
|
||||||
r.client.Set(failedKey, tc.failed, 0)
|
r.client.Set(failedKey, tc.failed, 0)
|
||||||
|
r.client.SAdd(base.AllQueues, tc.allQueues...)
|
||||||
|
|
||||||
got, err := r.CurrentStats()
|
got, err := r.CurrentStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user