2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Add Latency field to QueueInfo

This commit is contained in:
Ken Hibino 2021-12-11 06:27:44 -08:00
parent e7c1c3ad6f
commit 99a6750656
7 changed files with 104 additions and 50 deletions

View File

@ -52,6 +52,9 @@ type QueueInfo struct {
// It is an approximate memory usage value in bytes since the value is computed by sampling. // It is an approximate memory usage value in bytes since the value is computed by sampling.
MemoryUsage int64 MemoryUsage int64
// Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration
// Size is the total number of tasks in the queue. // Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
Size int Size int
@ -95,6 +98,7 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
return &QueueInfo{ return &QueueInfo{
Queue: stats.Queue, Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage, MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency,
Size: stats.Size, Size: stats.Size,
Pending: stats.Pending, Pending: stats.Pending,
Active: stats.Active, Active: stats.Active,

View File

@ -19,6 +19,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/timeutil"
) )
func TestInspectorQueues(t *testing.T) { func TestInspectorQueues(t *testing.T) {
@ -269,6 +270,7 @@ func TestInspectorGetQueueInfo(t *testing.T) {
ignoreMemUsg := cmpopts.IgnoreFields(QueueInfo{}, "MemoryUsage") ignoreMemUsg := cmpopts.IgnoreFields(QueueInfo{}, "MemoryUsage")
inspector := NewInspector(getRedisConnOpt(t)) inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
@ -279,6 +281,7 @@ func TestInspectorGetQueueInfo(t *testing.T) {
completed map[string][]base.Z completed map[string][]base.Z
processed map[string]int processed map[string]int
failed map[string]int failed map[string]int
oldestPendingMessageEnqueueTime map[string]time.Time
qname string qname string
want *QueueInfo want *QueueInfo
}{ }{
@ -326,9 +329,15 @@ func TestInspectorGetQueueInfo(t *testing.T) {
"critical": 0, "critical": 0,
"low": 5, "low": 5,
}, },
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond),
"low": now.Add(-30 * time.Second),
},
qname: "default", qname: "default",
want: &QueueInfo{ want: &QueueInfo{
Queue: "default", Queue: "default",
Latency: 15 * time.Second,
Size: 4, Size: 4,
Pending: 1, Pending: 1,
Active: 1, Active: 1,
@ -352,13 +361,21 @@ func TestInspectorGetQueueInfo(t *testing.T) {
h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllArchivedQueues(t, r, tc.archived) h.SeedAllArchivedQueues(t, r, tc.archived)
h.SeedAllCompletedQueues(t, r, tc.completed) h.SeedAllCompletedQueues(t, r, tc.completed)
ctx := context.Background()
for qname, n := range tc.processed { for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now) processedKey := base.ProcessedKey(qname, now)
r.Set(context.Background(), processedKey, n, 0) r.Set(ctx, processedKey, n, 0)
} }
for qname, n := range tc.failed { for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now) failedKey := base.FailedKey(qname, now)
r.Set(context.Background(), failedKey, n, 0) r.Set(ctx, failedKey, n, 0)
}
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() {
continue
}
oldestPendingMessageID := r.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list
r.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano())
} }
got, err := inspector.GetQueueInfo(tc.qname) got, err := inspector.GetQueueInfo(tc.qname)

View File

@ -46,6 +46,8 @@ type Stats struct {
Processed int Processed int
// Total number of tasks failed during the current date. // Total number of tasks failed during the current date.
Failed int Failed int
// Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration
// Time this stats was taken. // Time this stats was taken.
Timestamp time.Time Timestamp time.Time
} }
@ -72,10 +74,13 @@ type DailyStats struct {
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd> // KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd> // KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:paused // KEYS[9] -> asynq:<qname>:paused
//
// ARGV[1] -> task key prefix
var currentStatsCmd = redis.NewScript(` var currentStatsCmd = redis.NewScript(`
local res = {} local res = {}
local pendingTaskCount = redis.call("LLEN", KEYS[1])
table.insert(res, KEYS[1]) table.insert(res, KEYS[1])
table.insert(res, redis.call("LLEN", KEYS[1])) table.insert(res, pendingTaskCount)
table.insert(res, KEYS[2]) table.insert(res, KEYS[2])
table.insert(res, redis.call("LLEN", KEYS[2])) table.insert(res, redis.call("LLEN", KEYS[2]))
table.insert(res, KEYS[3]) table.insert(res, KEYS[3])
@ -102,6 +107,13 @@ table.insert(res, KEYS[8])
table.insert(res, fcount) table.insert(res, fcount)
table.insert(res, KEYS[9]) table.insert(res, KEYS[9])
table.insert(res, redis.call("EXISTS", KEYS[9])) table.insert(res, redis.call("EXISTS", KEYS[9]))
table.insert(res, "oldest_pending_since")
if pendingTaskCount > 0 then
local id = redis.call("LRANGE", KEYS[1], -1, -1)[1]
table.insert(res, redis.call("HGET", ARGV[1] .. id, "pending_since"))
else
table.insert(res, 0)
end
return res`) return res`)
// CurrentStats returns a current state of the queues. // CurrentStats returns a current state of the queues.
@ -125,7 +137,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.ProcessedKey(qname, now), base.ProcessedKey(qname, now),
base.FailedKey(qname, now), base.FailedKey(qname, now),
base.PausedKey(qname), base.PausedKey(qname),
}).Result() }, base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return nil, errors.E(op, errors.Unknown, err) return nil, errors.E(op, errors.Unknown, err)
} }
@ -170,6 +182,12 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
} else { } else {
stats.Paused = true stats.Paused = true
} }
case "oldest_pending_since":
if val == 0 {
stats.Latency = 0
} else {
stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val)))
}
} }
} }
stats.Size = size stats.Size = size

View File

@ -17,6 +17,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/timeutil"
) )
func TestAllQueues(t *testing.T) { func TestAllQueues(t *testing.T) {
@ -60,10 +61,11 @@ func TestCurrentStats(t *testing.T) {
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
now := time.Now() now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage active map[string][]*base.TaskMessage
scheduled map[string][]base.Z scheduled map[string][]base.Z
retry map[string][]base.Z retry map[string][]base.Z
archived map[string][]base.Z archived map[string][]base.Z
@ -71,6 +73,7 @@ func TestCurrentStats(t *testing.T) {
processed map[string]int processed map[string]int
failed map[string]int failed map[string]int
paused []string paused []string
oldestPendingMessageEnqueueTime map[string]time.Time
qname string qname string
want *Stats want *Stats
}{ }{
@ -80,7 +83,7 @@ func TestCurrentStats(t *testing.T) {
"critical": {m5}, "critical": {m5},
"low": {m6}, "low": {m6},
}, },
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {m2}, "default": {m2},
"critical": {}, "critical": {},
"low": {}, "low": {},
@ -118,6 +121,11 @@ func TestCurrentStats(t *testing.T) {
"critical": 0, "critical": 0,
"low": 1, "low": 1,
}, },
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond),
"low": now.Add(-30 * time.Second),
},
paused: []string{}, paused: []string{},
qname: "default", qname: "default",
want: &Stats{ want: &Stats{
@ -132,16 +140,17 @@ func TestCurrentStats(t *testing.T) {
Completed: 0, Completed: 0,
Processed: 120, Processed: 120,
Failed: 2, Failed: 2,
Latency: 15 * time.Second,
Timestamp: now, Timestamp: now,
}, },
}, },
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
"default": {m1}, "default": {m1},
"critical": {m5}, "critical": {},
"low": {m6}, "low": {m6},
}, },
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {m2}, "default": {m2},
"critical": {}, "critical": {},
"low": {}, "low": {},
@ -179,13 +188,18 @@ func TestCurrentStats(t *testing.T) {
"critical": 0, "critical": 0,
"low": 1, "low": 1,
}, },
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": time.Time{}, // zero value since there's no pending task in this queue
"low": now.Add(-30 * time.Second),
},
paused: []string{"critical", "low"}, paused: []string{"critical", "low"},
qname: "critical", qname: "critical",
want: &Stats{ want: &Stats{
Queue: "critical", Queue: "critical",
Paused: true, Paused: true,
Size: 1, Size: 0,
Pending: 1, Pending: 0,
Active: 0, Active: 0,
Scheduled: 0, Scheduled: 0,
Retry: 0, Retry: 0,
@ -193,6 +207,7 @@ func TestCurrentStats(t *testing.T) {
Completed: 0, Completed: 0,
Processed: 100, Processed: 100,
Failed: 0, Failed: 0,
Latency: 0,
Timestamp: now, Timestamp: now,
}, },
}, },
@ -206,18 +221,26 @@ func TestCurrentStats(t *testing.T) {
} }
} }
h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived) h.SeedAllArchivedQueues(t, r.client, tc.archived)
h.SeedAllCompletedQueues(t, r.client, tc.completed) h.SeedAllCompletedQueues(t, r.client, tc.completed)
ctx := context.Background()
for qname, n := range tc.processed { for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now) processedKey := base.ProcessedKey(qname, now)
r.client.Set(context.Background(), processedKey, n, 0) r.client.Set(ctx, processedKey, n, 0)
} }
for qname, n := range tc.failed { for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now) failedKey := base.FailedKey(qname, now)
r.client.Set(context.Background(), failedKey, n, 0) r.client.Set(ctx, failedKey, n, 0)
}
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() {
continue
}
oldestPendingMessageID := r.client.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list
r.client.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano())
} }
got, err := r.CurrentStats(tc.qname) got, err := r.CurrentStats(tc.qname)

View File

@ -85,7 +85,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *
// ARGV[2] -> task ID // ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline) // ARGV[4] -> task deadline in unix time (0 if no deadline)
// ARGV[5] -> current uinx time in millisecond // ARGV[5] -> current unix time in nsec
// //
// Output: // Output:
// Returns 1 if successfully enqueued // Returns 1 if successfully enqueued
@ -123,7 +123,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
msg.ID, msg.ID,
msg.Timeout, msg.Timeout,
msg.Deadline, msg.Deadline,
timeutil.UnixMilli(r.clock.Now()), r.clock.Now().UnixNano(),
} }
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
if err != nil { if err != nil {
@ -146,7 +146,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
// ARGV[3] -> task message data // ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline) // ARGV[5] -> task deadline in unix time (0 if no deadline)
// ARGV[6] -> current unix time in milliseconds // ARGV[6] -> current unix time in nsec
// //
// Output: // Output:
// Returns 1 if successfully enqueued // Returns 1 if successfully enqueued
@ -193,7 +193,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
encoded, encoded,
msg.Timeout, msg.Timeout,
msg.Deadline, msg.Deadline,
timeutil.UnixMilli(r.clock.Now()), r.clock.Now().UnixNano(),
} }
n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...) n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...)
if err != nil { if err != nil {
@ -774,7 +774,7 @@ func (r *RDB) ForwardIfReady(qnames ...string) error {
// KEYS[2] -> asynq:{<qname>}:pending // KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> current unix time in seconds // ARGV[1] -> current unix time in seconds
// ARGV[2] -> task key prefix // ARGV[2] -> task key prefix
// ARGV[3] -> current unix time in milliseconds // ARGV[3] -> current unix time in nsec
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short. // Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
@ -792,7 +792,7 @@ return table.getn(ids)`)
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) { func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
now := r.clock.Now() now := r.clock.Now()
res, err := forwardCmd.Run(context.Background(), r.client, res, err := forwardCmd.Run(context.Background(), r.client,
[]string{src, dst}, now.Unix(), taskKeyPrefix, timeutil.UnixMilli(now)).Result() []string{src, dst}, now.Unix(), taskKeyPrefix, now.UnixNano()).Result()
if err != nil { if err != nil {
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
} }

View File

@ -120,7 +120,7 @@ func TestEnqueue(t *testing.T) {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
} }
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want { if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
} }
@ -242,7 +242,7 @@ func TestEnqueueUnique(t *testing.T) {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
} }
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want { if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
} }
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
@ -2087,7 +2087,7 @@ func TestForwardIfReady(t *testing.T) {
// Make sure "pending_since" field is set // Make sure "pending_since" field is set
for _, msg := range gotPending { for _, msg := range gotPending {
pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val() pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val()
if want := strconv.Itoa(int(timeutil.UnixMilli(now))); pendingSince != want { if want := strconv.Itoa(int(now.UnixNano())); pendingSince != want {
t.Error("pending_since field is not set for newly pending message") t.Error("pending_since field is not set for newly pending message")
} }
} }

View File

@ -36,11 +36,3 @@ func (c *SimulatedClock) Now() time.Time { return c.t }
func (c *SimulatedClock) SetTime(t time.Time) { c.t = t } func (c *SimulatedClock) SetTime(t time.Time) { c.t = t }
func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) } func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) }
// UnixMilli returns t as a Unix time, the number of milliseconds elapsed since
// January 1, 1970 UTC.
//
// TODO: Use time.UnixMilli() when we drop support for go1.16 or below
func UnixMilli(t time.Time) int64 {
return t.UnixNano() / 1e6
}