diff --git a/inspector.go b/inspector.go index 870f36f..1b535fe 100644 --- a/inspector.go +++ b/inspector.go @@ -52,6 +52,9 @@ type QueueInfo struct { // It is an approximate memory usage value in bytes since the value is computed by sampling. MemoryUsage int64 + // Latency of the queue, measured by the oldest pending task in the queue. + Latency time.Duration + // Size is the total number of tasks in the queue. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. Size int @@ -95,6 +98,7 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) { return &QueueInfo{ Queue: stats.Queue, MemoryUsage: stats.MemoryUsage, + Latency: stats.Latency, Size: stats.Size, Pending: stats.Pending, Active: stats.Active, diff --git a/inspector_test.go b/inspector_test.go index 8a418f1..d3f86fc 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -19,6 +19,7 @@ import ( h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/timeutil" ) func TestInspectorQueues(t *testing.T) { @@ -269,18 +270,20 @@ func TestInspectorGetQueueInfo(t *testing.T) { ignoreMemUsg := cmpopts.IgnoreFields(QueueInfo{}, "MemoryUsage") inspector := NewInspector(getRedisConnOpt(t)) + inspector.rdb.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { - pending map[string][]*base.TaskMessage - active map[string][]*base.TaskMessage - scheduled map[string][]base.Z - retry map[string][]base.Z - archived map[string][]base.Z - completed map[string][]base.Z - processed map[string]int - failed map[string]int - qname string - want *QueueInfo + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + completed map[string][]base.Z + processed map[string]int + failed map[string]int + oldestPendingMessageEnqueueTime map[string]time.Time + qname string + want *QueueInfo }{ { pending: map[string][]*base.TaskMessage{ @@ -326,9 +329,15 @@ func TestInspectorGetQueueInfo(t *testing.T) { "critical": 0, "low": 5, }, + oldestPendingMessageEnqueueTime: map[string]time.Time{ + "default": now.Add(-15 * time.Second), + "critical": now.Add(-200 * time.Millisecond), + "low": now.Add(-30 * time.Second), + }, qname: "default", want: &QueueInfo{ Queue: "default", + Latency: 15 * time.Second, Size: 4, Pending: 1, Active: 1, @@ -352,13 +361,21 @@ func TestInspectorGetQueueInfo(t *testing.T) { h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllArchivedQueues(t, r, tc.archived) h.SeedAllCompletedQueues(t, r, tc.completed) + ctx := context.Background() for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) - r.Set(context.Background(), processedKey, n, 0) + r.Set(ctx, processedKey, n, 0) } for qname, n := range tc.failed { failedKey := base.FailedKey(qname, now) - r.Set(context.Background(), failedKey, n, 0) + r.Set(ctx, failedKey, n, 0) + } + for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { + if enqueueTime.IsZero() { + continue + } + oldestPendingMessageID := r.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list + r.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano()) } got, err := inspector.GetQueueInfo(tc.qname) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index ad861fb..0612b38 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -46,6 +46,8 @@ type Stats struct { Processed int // Total number of tasks failed during the current date. Failed int + // Latency of the queue, measured by the oldest pending task in the queue. + Latency time.Duration // Time this stats was taken. Timestamp time.Time } @@ -72,10 +74,13 @@ type DailyStats struct { // KEYS[7] -> asynq::processed: // KEYS[8] -> asynq::failed: // KEYS[9] -> asynq::paused +// +// ARGV[1] -> task key prefix var currentStatsCmd = redis.NewScript(` local res = {} +local pendingTaskCount = redis.call("LLEN", KEYS[1]) table.insert(res, KEYS[1]) -table.insert(res, redis.call("LLEN", KEYS[1])) +table.insert(res, pendingTaskCount) table.insert(res, KEYS[2]) table.insert(res, redis.call("LLEN", KEYS[2])) table.insert(res, KEYS[3]) @@ -102,6 +107,13 @@ table.insert(res, KEYS[8]) table.insert(res, fcount) table.insert(res, KEYS[9]) table.insert(res, redis.call("EXISTS", KEYS[9])) +table.insert(res, "oldest_pending_since") +if pendingTaskCount > 0 then + local id = redis.call("LRANGE", KEYS[1], -1, -1)[1] + table.insert(res, redis.call("HGET", ARGV[1] .. id, "pending_since")) +else + table.insert(res, 0) +end return res`) // CurrentStats returns a current state of the queues. @@ -125,7 +137,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { base.ProcessedKey(qname, now), base.FailedKey(qname, now), base.PausedKey(qname), - }).Result() + }, base.TaskKeyPrefix(qname)).Result() if err != nil { return nil, errors.E(op, errors.Unknown, err) } @@ -170,6 +182,12 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { } else { stats.Paused = true } + case "oldest_pending_since": + if val == 0 { + stats.Latency = 0 + } else { + stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val))) + } } } stats.Size = size diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 1a9f49f..8af2584 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -17,6 +17,7 @@ import ( h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" + "github.com/hibiken/asynq/internal/timeutil" ) func TestAllQueues(t *testing.T) { @@ -60,19 +61,21 @@ func TestCurrentStats(t *testing.T) { m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) tests := []struct { - pending map[string][]*base.TaskMessage - inProgress map[string][]*base.TaskMessage - scheduled map[string][]base.Z - retry map[string][]base.Z - archived map[string][]base.Z - completed map[string][]base.Z - processed map[string]int - failed map[string]int - paused []string - qname string - want *Stats + pending map[string][]*base.TaskMessage + active map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + completed map[string][]base.Z + processed map[string]int + failed map[string]int + paused []string + oldestPendingMessageEnqueueTime map[string]time.Time + qname string + want *Stats }{ { pending: map[string][]*base.TaskMessage{ @@ -80,7 +83,7 @@ func TestCurrentStats(t *testing.T) { "critical": {m5}, "low": {m6}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {m2}, "critical": {}, "low": {}, @@ -118,6 +121,11 @@ func TestCurrentStats(t *testing.T) { "critical": 0, "low": 1, }, + oldestPendingMessageEnqueueTime: map[string]time.Time{ + "default": now.Add(-15 * time.Second), + "critical": now.Add(-200 * time.Millisecond), + "low": now.Add(-30 * time.Second), + }, paused: []string{}, qname: "default", want: &Stats{ @@ -132,16 +140,17 @@ func TestCurrentStats(t *testing.T) { Completed: 0, Processed: 120, Failed: 2, + Latency: 15 * time.Second, Timestamp: now, }, }, { pending: map[string][]*base.TaskMessage{ "default": {m1}, - "critical": {m5}, + "critical": {}, "low": {m6}, }, - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {m2}, "critical": {}, "low": {}, @@ -179,13 +188,18 @@ func TestCurrentStats(t *testing.T) { "critical": 0, "low": 1, }, + oldestPendingMessageEnqueueTime: map[string]time.Time{ + "default": now.Add(-15 * time.Second), + "critical": time.Time{}, // zero value since there's no pending task in this queue + "low": now.Add(-30 * time.Second), + }, paused: []string{"critical", "low"}, qname: "critical", want: &Stats{ Queue: "critical", Paused: true, - Size: 1, - Pending: 1, + Size: 0, + Pending: 0, Active: 0, Scheduled: 0, Retry: 0, @@ -193,6 +207,7 @@ func TestCurrentStats(t *testing.T) { Completed: 0, Processed: 100, Failed: 0, + Latency: 0, Timestamp: now, }, }, @@ -206,18 +221,26 @@ func TestCurrentStats(t *testing.T) { } } h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllActiveQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.active) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllArchivedQueues(t, r.client, tc.archived) h.SeedAllCompletedQueues(t, r.client, tc.completed) + ctx := context.Background() for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) - r.client.Set(context.Background(), processedKey, n, 0) + r.client.Set(ctx, processedKey, n, 0) } for qname, n := range tc.failed { failedKey := base.FailedKey(qname, now) - r.client.Set(context.Background(), failedKey, n, 0) + r.client.Set(ctx, failedKey, n, 0) + } + for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { + if enqueueTime.IsZero() { + continue + } + oldestPendingMessageID := r.client.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list + r.client.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano()) } got, err := r.CurrentStats(tc.qname) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 70072a9..13d0187 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -85,7 +85,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script * // ARGV[2] -> task ID // ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task deadline in unix time (0 if no deadline) -// ARGV[5] -> current uinx time in millisecond +// ARGV[5] -> current unix time in nsec // // Output: // Returns 1 if successfully enqueued @@ -123,7 +123,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { msg.ID, msg.Timeout, msg.Deadline, - timeutil.UnixMilli(r.clock.Now()), + r.clock.Now().UnixNano(), } n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...) if err != nil { @@ -146,7 +146,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error { // ARGV[3] -> task message data // ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[5] -> task deadline in unix time (0 if no deadline) -// ARGV[6] -> current unix time in milliseconds +// ARGV[6] -> current unix time in nsec // // Output: // Returns 1 if successfully enqueued @@ -193,7 +193,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time encoded, msg.Timeout, msg.Deadline, - timeutil.UnixMilli(r.clock.Now()), + r.clock.Now().UnixNano(), } n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...) if err != nil { @@ -774,7 +774,7 @@ func (r *RDB) ForwardIfReady(qnames ...string) error { // KEYS[2] -> asynq:{}:pending // ARGV[1] -> current unix time in seconds // ARGV[2] -> task key prefix -// ARGV[3] -> current unix time in milliseconds +// ARGV[3] -> current unix time in nsec // Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) @@ -792,7 +792,7 @@ return table.getn(ids)`) func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) { now := r.clock.Now() res, err := forwardCmd.Run(context.Background(), r.client, - []string{src, dst}, now.Unix(), taskKeyPrefix, timeutil.UnixMilli(now)).Result() + []string{src, dst}, now.Unix(), taskKeyPrefix, now.UnixNano()).Result() if err != nil { return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5f696a2..caacab5 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -120,7 +120,7 @@ func TestEnqueue(t *testing.T) { t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) } pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field - if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want { + if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want { t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) } @@ -242,7 +242,7 @@ func TestEnqueueUnique(t *testing.T) { t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want) } pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field - if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want { + if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want { t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want) } uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field @@ -2087,7 +2087,7 @@ func TestForwardIfReady(t *testing.T) { // Make sure "pending_since" field is set for _, msg := range gotPending { pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val() - if want := strconv.Itoa(int(timeutil.UnixMilli(now))); pendingSince != want { + if want := strconv.Itoa(int(now.UnixNano())); pendingSince != want { t.Error("pending_since field is not set for newly pending message") } } diff --git a/internal/timeutil/timeutil.go b/internal/timeutil/timeutil.go index 05738d4..65691ad 100644 --- a/internal/timeutil/timeutil.go +++ b/internal/timeutil/timeutil.go @@ -36,11 +36,3 @@ func (c *SimulatedClock) Now() time.Time { return c.t } func (c *SimulatedClock) SetTime(t time.Time) { c.t = t } func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) } - -// UnixMilli returns t as a Unix time, the number of milliseconds elapsed since -// January 1, 1970 UTC. -// -// TODO: Use time.UnixMilli() when we drop support for go1.16 or below -func UnixMilli(t time.Time) int64 { - return t.UnixNano() / 1e6 -}