diff --git a/CHANGELOG.md b/CHANGELOG.md index e2a2c5c..660cbfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Package `x/metrics` is added. - Tool `tools/metrics_exporter` binary is added. +- `ProcessedTotal` and `FailedTotal` fields were added to `QueueInfo` struct. ## [0.19.1] - 2021-12-12 diff --git a/inspector.go b/inspector.go index 1b535fe..f9bf17b 100644 --- a/inspector.go +++ b/inspector.go @@ -72,12 +72,17 @@ type QueueInfo struct { // Number of stored completed tasks. Completed int - // Total number of tasks being processed during the given date. + // Total number of tasks being processed within the given date (counter resets daily). // The number includes both succeeded and failed tasks. Processed int - // Total number of tasks failed to be processed during the given date. + // Total number of tasks failed to be processed within the given date (counter resets daily). Failed int + // Total number of tasks processed (cumulative). + ProcessedTotal int + // Total number of tasks failed (cumulative). + FailedTotal int + // Paused indicates whether the queue is paused. // If true, tasks in the queue will not be processed. Paused bool @@ -96,20 +101,22 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) { return nil, err } return &QueueInfo{ - Queue: stats.Queue, - MemoryUsage: stats.MemoryUsage, - Latency: stats.Latency, - Size: stats.Size, - Pending: stats.Pending, - Active: stats.Active, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Archived: stats.Archived, - Completed: stats.Completed, - Processed: stats.Processed, - Failed: stats.Failed, - Paused: stats.Paused, - Timestamp: stats.Timestamp, + Queue: stats.Queue, + MemoryUsage: stats.MemoryUsage, + Latency: stats.Latency, + Size: stats.Size, + Pending: stats.Pending, + Active: stats.Active, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Archived: stats.Archived, + Completed: stats.Completed, + Processed: stats.Processed, + Failed: stats.Failed, + ProcessedTotal: stats.ProcessedTotal, + FailedTotal: stats.FailedTotal, + Paused: stats.Paused, + Timestamp: stats.Timestamp, }, nil } diff --git a/inspector_test.go b/inspector_test.go index d3f86fc..89f21b3 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -281,6 +281,8 @@ func TestInspectorGetQueueInfo(t *testing.T) { completed map[string][]base.Z processed map[string]int failed map[string]int + processedTotal map[string]int + failedTotal map[string]int oldestPendingMessageEnqueueTime map[string]time.Time qname string want *QueueInfo @@ -329,6 +331,16 @@ func TestInspectorGetQueueInfo(t *testing.T) { "critical": 0, "low": 5, }, + processedTotal: map[string]int{ + "default": 11111, + "critical": 22222, + "low": 33333, + }, + failedTotal: map[string]int{ + "default": 111, + "critical": 222, + "low": 333, + }, oldestPendingMessageEnqueueTime: map[string]time.Time{ "default": now.Add(-15 * time.Second), "critical": now.Add(-200 * time.Millisecond), @@ -336,19 +348,21 @@ func TestInspectorGetQueueInfo(t *testing.T) { }, qname: "default", want: &QueueInfo{ - Queue: "default", - Latency: 15 * time.Second, - Size: 4, - Pending: 1, - Active: 1, - Scheduled: 2, - Retry: 0, - Archived: 0, - Completed: 0, - Processed: 120, - Failed: 2, - Paused: false, - Timestamp: now, + Queue: "default", + Latency: 15 * time.Second, + Size: 4, + Pending: 1, + Active: 1, + Scheduled: 2, + Retry: 0, + Archived: 0, + Completed: 0, + Processed: 120, + Failed: 2, + ProcessedTotal: 11111, + FailedTotal: 111, + Paused: false, + Timestamp: now, }, }, } @@ -363,12 +377,16 @@ func TestInspectorGetQueueInfo(t *testing.T) { h.SeedAllCompletedQueues(t, r, tc.completed) ctx := context.Background() for qname, n := range tc.processed { - processedKey := base.ProcessedKey(qname, now) - r.Set(ctx, processedKey, n, 0) + r.Set(ctx, base.ProcessedKey(qname, now), n, 0) } for qname, n := range tc.failed { - failedKey := base.FailedKey(qname, now) - r.Set(ctx, failedKey, n, 0) + r.Set(ctx, base.FailedKey(qname, now), n, 0) + } + for qname, n := range tc.processedTotal { + r.Set(ctx, base.ProcessedTotalKey(qname), n, 0) + } + for qname, n := range tc.failedTotal { + r.Set(ctx, base.FailedTotalKey(qname), n, 0) } for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { if enqueueTime.IsZero() { diff --git a/internal/base/base.go b/internal/base/base.go index a9aa951..91bb4f7 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -39,6 +39,12 @@ const ( CancelChannel = "asynq:cancel" // PubSub channel ) +// Max value for int64. +// +// Use this value to check if a redis counter value reached maximum. +// As documeted in https://redis.io/commands/INCR, a string stored at a redis key is interpreted as a base-10 64 bit signed integer. +const MaxInt64 = 1<<63 - 1 + // TaskState denotes the state of a task. type TaskState int @@ -150,6 +156,16 @@ func PausedKey(qname string) string { return fmt.Sprintf("%spaused", QueueKeyPrefix(qname)) } +// ProcessedTotalKey returns a redis key for total processed count for the given queue. +func ProcessedTotalKey(qname string) string { + return fmt.Sprintf("%sprocessed", QueueKeyPrefix(qname)) +} + +// FailedTotalKey returns a redis key for total failure count for the given queue. +func FailedTotalKey(qname string) string { + return fmt.Sprintf("%sfailed", QueueKeyPrefix(qname)) +} + // ProcessedKey returns a redis key for processed count for the given day for the queue. func ProcessedKey(qname string, t time.Time) string { return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) diff --git a/internal/base/base_test.go b/internal/base/base_test.go index a8c2d2f..a3f8c41 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -173,6 +173,40 @@ func TestPausedKey(t *testing.T) { } } +func TestProcessedTotalKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:processed"}, + {"custom", "asynq:{custom}:processed"}, + } + + for _, tc := range tests { + got := ProcessedTotalKey(tc.qname) + if got != tc.want { + t.Errorf("ProcessedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestFailedTotalKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:failed"}, + {"custom", "asynq:{custom}:failed"}, + } + + for _, tc := range tests { + got := FailedTotalKey(tc.qname) + if got != tc.want { + t.Errorf("FailedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + func TestProcessedKey(t *testing.T) { tests := []struct { qname string diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 0612b38..51db716 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -41,11 +41,18 @@ type Stats struct { Retry int Archived int Completed int - // Total number of tasks processed during the current date. + + // Number of tasks processed within the current date. // The number includes both succeeded and failed tasks. Processed int - // Total number of tasks failed during the current date. + // Number of tasks failed within the current date. Failed int + + // Total number of tasks processed (both succeeded and failed) from this queue. + ProcessedTotal int + // Total number of tasks failed. + FailedTotal int + // Latency of the queue, measured by the oldest pending task in the queue. Latency time.Duration // Time this stats was taken. @@ -65,15 +72,17 @@ type DailyStats struct { Time time.Time } -// KEYS[1] -> asynq::pending -// KEYS[2] -> asynq::active -// KEYS[3] -> asynq::scheduled -// KEYS[4] -> asynq::retry -// KEYS[5] -> asynq::archived -// KEYS[6] -> asynq::completed -// KEYS[7] -> asynq::processed: -// KEYS[8] -> asynq::failed: -// KEYS[9] -> asynq::paused +// KEYS[1] -> asynq::pending +// KEYS[2] -> asynq::active +// KEYS[3] -> asynq::scheduled +// KEYS[4] -> asynq::retry +// KEYS[5] -> asynq::archived +// KEYS[6] -> asynq::completed +// KEYS[7] -> asynq::processed: +// KEYS[8] -> asynq::failed: +// KEYS[9] -> asynq::processed +// KEYS[10] -> asynq::failed +// KEYS[11] -> asynq::paused // // ARGV[1] -> task key prefix var currentStatsCmd = redis.NewScript(` @@ -91,22 +100,17 @@ table.insert(res, KEYS[5]) table.insert(res, redis.call("ZCARD", KEYS[5])) table.insert(res, KEYS[6]) table.insert(res, redis.call("ZCARD", KEYS[6])) -local pcount = 0 -local p = redis.call("GET", KEYS[7]) -if p then - pcount = tonumber(p) +for i=7,10 do + local count = 0 + local n = redis.call("GET", KEYS[i]) + if n then + count = tonumber(n) + end + table.insert(res, KEYS[i]) + table.insert(res, count) end -table.insert(res, KEYS[7]) -table.insert(res, pcount) -local fcount = 0 -local f = redis.call("GET", KEYS[8]) -if f then - fcount = tonumber(f) -end -table.insert(res, KEYS[8]) -table.insert(res, fcount) -table.insert(res, KEYS[9]) -table.insert(res, redis.call("EXISTS", KEYS[9])) +table.insert(res, KEYS[11]) +table.insert(res, redis.call("EXISTS", KEYS[11])) table.insert(res, "oldest_pending_since") if pendingTaskCount > 0 then local id = redis.call("LRANGE", KEYS[1], -1, -1)[1] @@ -136,6 +140,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { base.CompletedKey(qname), base.ProcessedKey(qname, now), base.FailedKey(qname, now), + base.ProcessedTotalKey(qname), + base.FailedTotalKey(qname), base.PausedKey(qname), }, base.TaskKeyPrefix(qname)).Result() if err != nil { @@ -176,6 +182,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { stats.Processed = val case base.FailedKey(qname, now): stats.Failed = val + case base.ProcessedTotalKey(qname): + stats.ProcessedTotal = val + case base.FailedTotalKey(qname): + stats.FailedTotal = val case base.PausedKey(qname): if val == 0 { stats.Paused = false diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8af2584..ef07abb 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -72,6 +72,8 @@ func TestCurrentStats(t *testing.T) { completed map[string][]base.Z processed map[string]int failed map[string]int + processedTotal map[string]int + failedTotal map[string]int paused []string oldestPendingMessageEnqueueTime map[string]time.Time qname string @@ -121,6 +123,16 @@ func TestCurrentStats(t *testing.T) { "critical": 0, "low": 1, }, + processedTotal: map[string]int{ + "default": 11111, + "critical": 22222, + "low": 33333, + }, + failedTotal: map[string]int{ + "default": 111, + "critical": 222, + "low": 333, + }, oldestPendingMessageEnqueueTime: map[string]time.Time{ "default": now.Add(-15 * time.Second), "critical": now.Add(-200 * time.Millisecond), @@ -129,19 +141,21 @@ func TestCurrentStats(t *testing.T) { paused: []string{}, qname: "default", want: &Stats{ - Queue: "default", - Paused: false, - Size: 4, - Pending: 1, - Active: 1, - Scheduled: 2, - Retry: 0, - Archived: 0, - Completed: 0, - Processed: 120, - Failed: 2, - Latency: 15 * time.Second, - Timestamp: now, + Queue: "default", + Paused: false, + Size: 4, + Pending: 1, + Active: 1, + Scheduled: 2, + Retry: 0, + Archived: 0, + Completed: 0, + Processed: 120, + Failed: 2, + ProcessedTotal: 11111, + FailedTotal: 111, + Latency: 15 * time.Second, + Timestamp: now, }, }, { @@ -188,6 +202,16 @@ func TestCurrentStats(t *testing.T) { "critical": 0, "low": 1, }, + processedTotal: map[string]int{ + "default": 11111, + "critical": 22222, + "low": 33333, + }, + failedTotal: map[string]int{ + "default": 111, + "critical": 222, + "low": 333, + }, oldestPendingMessageEnqueueTime: map[string]time.Time{ "default": now.Add(-15 * time.Second), "critical": time.Time{}, // zero value since there's no pending task in this queue @@ -196,19 +220,21 @@ func TestCurrentStats(t *testing.T) { paused: []string{"critical", "low"}, qname: "critical", want: &Stats{ - Queue: "critical", - Paused: true, - Size: 0, - Pending: 0, - Active: 0, - Scheduled: 0, - Retry: 0, - Archived: 0, - Completed: 0, - Processed: 100, - Failed: 0, - Latency: 0, - Timestamp: now, + Queue: "critical", + Paused: true, + Size: 0, + Pending: 0, + Active: 0, + Scheduled: 0, + Retry: 0, + Archived: 0, + Completed: 0, + Processed: 100, + Failed: 0, + ProcessedTotal: 22222, + FailedTotal: 222, + Latency: 0, + Timestamp: now, }, }, } @@ -228,12 +254,16 @@ func TestCurrentStats(t *testing.T) { h.SeedAllCompletedQueues(t, r.client, tc.completed) ctx := context.Background() for qname, n := range tc.processed { - processedKey := base.ProcessedKey(qname, now) - r.client.Set(ctx, processedKey, n, 0) + r.client.Set(ctx, base.ProcessedKey(qname, now), n, 0) } for qname, n := range tc.failed { - failedKey := base.FailedKey(qname, now) - r.client.Set(ctx, failedKey, n, 0) + r.client.Set(ctx, base.FailedKey(qname, now), n, 0) + } + for qname, n := range tc.processedTotal { + r.client.Set(ctx, base.ProcessedTotalKey(qname), n, 0) + } + for qname, n := range tc.failedTotal { + r.client.Set(ctx, base.FailedTotalKey(qname), n, 0) } for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { if enqueueTime.IsZero() { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 4152618..af849b9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -303,8 +303,10 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:t: // KEYS[4] -> asynq:{}:processed: +// KEYS[5] -> asynq:{}:processed // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp +// ARGV[3] -> max int64 value var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -319,6 +321,12 @@ local n = redis.call("INCR", KEYS[4]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2]) end +local total = redis.call("GET", KEYS[5]) +if tonumber(total) == tonumber(ARGV[3]) then + redis.call("SET", KEYS[5], 1) +else + redis.call("INCR", KEYS[5]) +end return redis.status_reply("OK") `) @@ -326,9 +334,11 @@ return redis.status_reply("OK") // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:t: // KEYS[4] -> asynq:{}:processed: -// KEYS[5] -> unique key +// KEYS[5] -> asynq:{}:processed +// KEYS[6] -> unique key // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp +// ARGV[3] -> max int64 value var doneUniqueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -343,8 +353,14 @@ local n = redis.call("INCR", KEYS[4]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[4], ARGV[2]) end -if redis.call("GET", KEYS[5]) == ARGV[1] then - redis.call("DEL", KEYS[5]) +local total = redis.call("GET", KEYS[5]) +if tonumber(total) == tonumber(ARGV[3]) then + redis.call("SET", KEYS[5], 1) +else + redis.call("INCR", KEYS[5]) +end +if redis.call("GET", KEYS[6]) == ARGV[1] then + redis.call("DEL", KEYS[6]) end return redis.status_reply("OK") `) @@ -361,10 +377,12 @@ func (r *RDB) Done(msg *base.TaskMessage) error { base.DeadlinesKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), base.ProcessedKey(msg.Queue, now), + base.ProcessedTotalKey(msg.Queue), } argv := []interface{}{ msg.ID, expireAt.Unix(), + base.MaxInt64, } // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { @@ -379,10 +397,13 @@ func (r *RDB) Done(msg *base.TaskMessage) error { // KEYS[3] -> asynq:{}:completed // KEYS[4] -> asynq:{}:t: // KEYS[5] -> asynq:{}:processed: +// KEYS[6] -> asynq:{}:processed +// // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task exipration time in unix time // ARGV[4] -> task message data +// ARGV[5] -> max int64 value var markAsCompleteCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -398,6 +419,12 @@ local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2]) end +local total = redis.call("GET", KEYS[6]) +if tonumber(total) == tonumber(ARGV[5]) then + redis.call("SET", KEYS[6], 1) +else + redis.call("INCR", KEYS[6]) +end return redis.status_reply("OK") `) @@ -406,11 +433,14 @@ return redis.status_reply("OK") // KEYS[3] -> asynq:{}:completed // KEYS[4] -> asynq:{}:t: // KEYS[5] -> asynq:{}:processed: -// KEYS[6] -> asynq:{}:unique:{} +// KEYS[6] -> asynq:{}:processed +// KEYS[7] -> asynq:{}:unique:{} +// // ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task exipration time in unix time // ARGV[4] -> task message data +// ARGV[5] -> max int64 value var markAsCompleteUniqueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -426,8 +456,14 @@ local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[2]) end -if redis.call("GET", KEYS[6]) == ARGV[1] then - redis.call("DEL", KEYS[6]) +local total = redis.call("GET", KEYS[6]) +if tonumber(total) == tonumber(ARGV[5]) then + redis.call("SET", KEYS[6], 1) +else + redis.call("INCR", KEYS[6]) +end +if redis.call("GET", KEYS[7]) == ARGV[1] then + redis.call("DEL", KEYS[7]) end return redis.status_reply("OK") `) @@ -450,12 +486,14 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error { base.CompletedKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID), base.ProcessedKey(msg.Queue, now), + base.ProcessedTotalKey(msg.Queue), } argv := []interface{}{ msg.ID, statsExpireAt.Unix(), now.Unix() + msg.Retention, encoded, + base.MaxInt64, } // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { @@ -625,11 +663,15 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:processed: // KEYS[6] -> asynq:{}:failed: +// KEYS[7] -> asynq:{}:processed +// KEYS[8] -> asynq:{}:failed +// // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp // ARGV[5] -> is_failure (bool) +// ARGV[6] -> max int64 value var retryCmd = redis.NewScript(` if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -648,6 +690,14 @@ if tonumber(ARGV[5]) == 1 then if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[4]) end + local total = redis.call("GET", KEYS[7]) + if tonumber(total) == tonumber(ARGV[6]) then + redis.call("SET", KEYS[7], 1) + redis.call("SET", KEYS[8], 1) + else + redis.call("INCR", KEYS[7]) + redis.call("INCR", KEYS[8]) + end end return redis.status_reply("OK")`) @@ -676,6 +726,8 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i base.RetryKey(msg.Queue), base.ProcessedKey(msg.Queue, now), base.FailedKey(msg.Queue, now), + base.ProcessedTotalKey(msg.Queue), + base.FailedTotalKey(msg.Queue), } argv := []interface{}{ msg.ID, @@ -683,6 +735,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i processAt.Unix(), expireAt.Unix(), isFailure, + base.MaxInt64, } return r.runScript(ctx, op, retryCmd, keys, argv...) } @@ -698,12 +751,16 @@ const ( // KEYS[4] -> asynq:{}:archived // KEYS[5] -> asynq:{}:processed: // KEYS[6] -> asynq:{}:failed: +// KEYS[7] -> asynq:{}:processed +// KEYS[8] -> asynq:{}:failed +// // ARGV[1] -> task ID // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[6] -> stats expiration timestamp +// ARGV[7] -> max int64 value var archiveCmd = redis.NewScript(` if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -723,6 +780,14 @@ local m = redis.call("INCR", KEYS[6]) if tonumber(m) == 1 then redis.call("EXPIREAT", KEYS[6], ARGV[6]) end +local total = redis.call("GET", KEYS[7]) +if tonumber(total) == tonumber(ARGV[7]) then + redis.call("SET", KEYS[7], 1) + redis.call("SET", KEYS[8], 1) +else + redis.call("INCR", KEYS[7]) + redis.call("INCR", KEYS[8]) +end return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. @@ -747,6 +812,8 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { base.ArchivedKey(msg.Queue), base.ProcessedKey(msg.Queue, now), base.FailedKey(msg.Queue, now), + base.ProcessedTotalKey(msg.Queue), + base.FailedTotalKey(msg.Queue), } argv := []interface{}{ msg.ID, @@ -755,6 +822,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { cutoff.Unix(), maxArchiveSize, expireAt.Unix(), + base.MaxInt64, } return r.runScript(ctx, op, archiveCmd, keys, argv...) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index caacab5..78feb12 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -807,18 +807,59 @@ func TestDone(t *testing.T) { if gotProcessed != "1" { t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed) } - gotTTL := r.client.TTL(context.Background(), processedKey).Val() if gotTTL > statsTTL { t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL) } + processedTotalKey := base.ProcessedTotalKey(tc.target.Queue) + gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val() + if gotProcessedTotal != "1" { + t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedTotalKey, gotProcessedTotal) + } + if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 { t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey) } } } +// Make sure that processed_total counter wraps to 1 when reaching int64 max value. +func TestDoneWithMaxCounter(t *testing.T) { + r := setup(t) + defer r.Close() + msg := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "foo", + Payload: nil, + Timeout: 1800, + Deadline: 0, + Queue: "default", + } + + z := base.Z{ + Message: msg, + Score: time.Now().Add(5 * time.Minute).Unix(), + } + h.SeedDeadlines(t, r.client, []base.Z{z}, msg.Queue) + h.SeedActiveQueue(t, r.client, []*base.TaskMessage{msg}, msg.Queue) + + processedTotalKey := base.ProcessedTotalKey(msg.Queue) + ctx := context.Background() + if err := r.client.Set(ctx, processedTotalKey, base.MaxInt64, 0).Err(); err != nil { + t.Fatalf("Redis command failed: SET %q %v", processedTotalKey, base.MaxInt64) + } + + if err := r.Done(msg); err != nil { + t.Fatalf("RDB.Done failed: %v", err) + } + + gotProcessedTotal := r.client.Get(ctx, processedTotalKey).Val() + if gotProcessedTotal != "1" { + t.Errorf("GET %q = %v, want 1", processedTotalKey, gotProcessedTotal) + } +} + func TestMarkAsComplete(t *testing.T) { r := setup(t) defer r.Close() @@ -1573,6 +1614,18 @@ func TestRetry(t *testing.T) { if gotTTL > statsTTL { t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL) } + + processedTotalKey := base.ProcessedTotalKey(tc.msg.Queue) + gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val() + if gotProcessedTotal != "1" { + t.Errorf("GET %q = %q, want 1", processedTotalKey, gotProcessedTotal) + } + + failedTotalKey := base.FailedTotalKey(tc.msg.Queue) + gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val() + if gotFailedTotal != "1" { + t.Errorf("GET %q = %q, want 1", failedTotalKey, gotFailedTotal) + } } } @@ -1740,6 +1793,18 @@ func TestRetryWithNonFailureError(t *testing.T) { if gotFailed != "" { t.Errorf("GET %q = %q, want empty", failedKey, gotFailed) } + + processedTotalKey := base.ProcessedTotalKey(tc.msg.Queue) + gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val() + if gotProcessedTotal != "" { + t.Errorf("GET %q = %q, want empty", processedTotalKey, gotProcessedTotal) + } + + failedTotalKey := base.FailedTotalKey(tc.msg.Queue) + gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val() + if gotFailedTotal != "" { + t.Errorf("GET %q = %q, want empty", failedTotalKey, gotFailedTotal) + } } } @@ -1950,6 +2015,18 @@ func TestArchive(t *testing.T) { if gotTTL > statsTTL { t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL) } + + processedTotalKey := base.ProcessedTotalKey(tc.target.Queue) + gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val() + if gotProcessedTotal != "1" { + t.Errorf("GET %q = %q, want 1", processedTotalKey, gotProcessedTotal) + } + + failedTotalKey := base.FailedTotalKey(tc.target.Queue) + gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val() + if gotFailedTotal != "1" { + t.Errorf("GET %q = %q, want 1", failedTotalKey, gotFailedTotal) + } } } diff --git a/x/metrics/metrics.go b/x/metrics/metrics.go index 7a5803d..b17a860 100644 --- a/x/metrics/metrics.go +++ b/x/metrics/metrics.go @@ -64,6 +64,18 @@ var ( []string{"queue"}, nil, ) + tasksProcessedTotalDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "tasks_processed_total"), + "Number of tasks processed (both succeeded and failed); broken down by queue", + []string{"queue"}, nil, + ) + + tasksFailedTotalDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "tasks_failed_total"), + "Number of tasks failed; broken down by queue", + []string{"queue"}, nil, + ) + pausedQueues = prometheus.NewDesc( prometheus.BuildFQName(namespace, "", "queue_paused_total"), "Number of queues paused", @@ -145,6 +157,20 @@ func (qmc *QueueMetricsCollector) Collect(ch chan<- prometheus.Metric) { info.Queue, ) + ch <- prometheus.MustNewConstMetric( + tasksProcessedTotalDesc, + prometheus.CounterValue, + float64(info.ProcessedTotal), + info.Queue, + ) + + ch <- prometheus.MustNewConstMetric( + tasksFailedTotalDesc, + prometheus.CounterValue, + float64(info.FailedTotal), + info.Queue, + ) + pausedValue := 0 // zero to indicate "not paused" if info.Paused { pausedValue = 1