mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
fix: CurrentState without processed/failed data
This commit is contained in:
parent
9b87f7c1f1
commit
fb24d158ae
@ -3,13 +3,13 @@ package rdb
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/rs/xid"
|
"github.com/rs/xid"
|
||||||
|
"github.com/spf13/cast"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stats represents a state of queues at a certain time.
|
// Stats represents a state of queues at a certain time.
|
||||||
@ -77,35 +77,57 @@ type DeadTask struct {
|
|||||||
|
|
||||||
// CurrentStats returns a current state of the queues.
|
// CurrentStats returns a current state of the queues.
|
||||||
func (r *RDB) CurrentStats() (*Stats, error) {
|
func (r *RDB) CurrentStats() (*Stats, error) {
|
||||||
|
// KEYS[1] -> asynq:queues:default
|
||||||
|
// KEYS[2] -> asynq:in_progress
|
||||||
|
// KEYS[3] -> asynq:scheduled
|
||||||
|
// KEYS[4] -> asynq:retry
|
||||||
|
// KEYS[5] -> asynq:dead
|
||||||
|
// KEYS[6] -> asynq:processed:<yyyy-mm-dd>
|
||||||
|
// KEYS[7] -> asynq:failure:<yyyy-mm-dd>
|
||||||
|
script := redis.NewScript(`
|
||||||
|
local qlen = redis.call("LLEN", KEYS[1])
|
||||||
|
local plen = redis.call("LLEN", KEYS[2])
|
||||||
|
local slen = redis.call("ZCARD", KEYS[3])
|
||||||
|
local rlen = redis.call("ZCARD", KEYS[4])
|
||||||
|
local dlen = redis.call("ZCARD", KEYS[5])
|
||||||
|
local pcount = 0
|
||||||
|
local p = redis.call("GET", KEYS[6])
|
||||||
|
if p then
|
||||||
|
pcount = tonumber(p)
|
||||||
|
end
|
||||||
|
local fcount = 0
|
||||||
|
local f = redis.call("GET", KEYS[7])
|
||||||
|
if f then
|
||||||
|
fcount = tonumber(f)
|
||||||
|
end
|
||||||
|
return {qlen, plen, slen, rlen, dlen, pcount, fcount}
|
||||||
|
`)
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
pipe := r.client.Pipeline()
|
res, err := script.Run(r.client, []string{
|
||||||
qlen := pipe.LLen(base.DefaultQueue)
|
base.DefaultQueue,
|
||||||
plen := pipe.LLen(base.InProgressQueue)
|
base.InProgressQueue,
|
||||||
slen := pipe.ZCard(base.ScheduledQueue)
|
base.ScheduledQueue,
|
||||||
rlen := pipe.ZCard(base.RetryQueue)
|
base.RetryQueue,
|
||||||
dlen := pipe.ZCard(base.DeadQueue)
|
base.DeadQueue,
|
||||||
pcount := pipe.Get(base.ProcessedKey(now))
|
base.ProcessedKey(now),
|
||||||
fcount := pipe.Get(base.FailureKey(now))
|
base.FailureKey(now),
|
||||||
_, err := pipe.Exec()
|
}).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p, err := strconv.Atoi(pcount.Val())
|
nums, err := cast.ToIntSliceE(res)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
f, err := strconv.Atoi(fcount.Val())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Stats{
|
return &Stats{
|
||||||
Enqueued: int(qlen.Val()),
|
Enqueued: nums[0],
|
||||||
InProgress: int(plen.Val()),
|
InProgress: nums[1],
|
||||||
Scheduled: int(slen.Val()),
|
Scheduled: nums[2],
|
||||||
Retry: int(rlen.Val()),
|
Retry: nums[3],
|
||||||
Dead: int(dlen.Val()),
|
Dead: nums[4],
|
||||||
Processed: p,
|
Processed: nums[5],
|
||||||
Failed: f,
|
Failed: nums[6],
|
||||||
Timestamp: now,
|
Timestamp: now,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -138,6 +138,30 @@ func TestCurrentStats(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCurrentStatsWithoutData(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
want := &Stats{
|
||||||
|
Enqueued: 0,
|
||||||
|
InProgress: 0,
|
||||||
|
Scheduled: 0,
|
||||||
|
Retry: 0,
|
||||||
|
Dead: 0,
|
||||||
|
Processed: 0,
|
||||||
|
Failed: 0,
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := r.CurrentStats()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("r.CurrentStats() = %v, %v, want %+v, nil", got, err, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" {
|
||||||
|
t.Errorf("r.CurrentStats() = %v, %v, want %+v, nil; (-want, +got)\n%s", got, err, want, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRedisInfo(t *testing.T) {
|
func TestRedisInfo(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
|
@ -84,7 +84,13 @@ func printStats(s *rdb.Stats) {
|
|||||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||||
fmt.Fprintf(tw, format, "Processed", "Failed", "Error Rate")
|
fmt.Fprintf(tw, format, "Processed", "Failed", "Error Rate")
|
||||||
fmt.Fprintf(tw, format, "---------", "------", "----------")
|
fmt.Fprintf(tw, format, "---------", "------", "----------")
|
||||||
fmt.Fprintf(tw, format, s.Processed, s.Failed, fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100))
|
var errrate string
|
||||||
|
if s.Processed == 0 {
|
||||||
|
errrate = "N/A"
|
||||||
|
} else {
|
||||||
|
errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)
|
||||||
|
}
|
||||||
|
fmt.Fprintf(tw, format, s.Processed, s.Failed, errrate)
|
||||||
tw.Flush()
|
tw.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user