2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Record total tasks processed/failed

This commit is contained in:
Ken Hibino 2021-12-16 16:53:02 -08:00 committed by GitHub
parent 43cb4ddf19
commit 82d18e3d91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 383 additions and 96 deletions

View File

@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Package `x/metrics` is added. - Package `x/metrics` is added.
- Tool `tools/metrics_exporter` binary is added. - Tool `tools/metrics_exporter` binary is added.
- `ProcessedTotal` and `FailedTotal` fields were added to `QueueInfo` struct.
## [0.19.1] - 2021-12-12 ## [0.19.1] - 2021-12-12

View File

@ -72,12 +72,17 @@ type QueueInfo struct {
// Number of stored completed tasks. // Number of stored completed tasks.
Completed int Completed int
// Total number of tasks being processed during the given date. // Total number of tasks being processed within the given date (counter resets daily).
// The number includes both succeeded and failed tasks. // The number includes both succeeded and failed tasks.
Processed int Processed int
// Total number of tasks failed to be processed during the given date. // Total number of tasks failed to be processed within the given date (counter resets daily).
Failed int Failed int
// Total number of tasks processed (cumulative).
ProcessedTotal int
// Total number of tasks failed (cumulative).
FailedTotal int
// Paused indicates whether the queue is paused. // Paused indicates whether the queue is paused.
// If true, tasks in the queue will not be processed. // If true, tasks in the queue will not be processed.
Paused bool Paused bool
@ -108,6 +113,8 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
Completed: stats.Completed, Completed: stats.Completed,
Processed: stats.Processed, Processed: stats.Processed,
Failed: stats.Failed, Failed: stats.Failed,
ProcessedTotal: stats.ProcessedTotal,
FailedTotal: stats.FailedTotal,
Paused: stats.Paused, Paused: stats.Paused,
Timestamp: stats.Timestamp, Timestamp: stats.Timestamp,
}, nil }, nil

View File

@ -281,6 +281,8 @@ func TestInspectorGetQueueInfo(t *testing.T) {
completed map[string][]base.Z completed map[string][]base.Z
processed map[string]int processed map[string]int
failed map[string]int failed map[string]int
processedTotal map[string]int
failedTotal map[string]int
oldestPendingMessageEnqueueTime map[string]time.Time oldestPendingMessageEnqueueTime map[string]time.Time
qname string qname string
want *QueueInfo want *QueueInfo
@ -329,6 +331,16 @@ func TestInspectorGetQueueInfo(t *testing.T) {
"critical": 0, "critical": 0,
"low": 5, "low": 5,
}, },
processedTotal: map[string]int{
"default": 11111,
"critical": 22222,
"low": 33333,
},
failedTotal: map[string]int{
"default": 111,
"critical": 222,
"low": 333,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{ oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second), "default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond), "critical": now.Add(-200 * time.Millisecond),
@ -347,6 +359,8 @@ func TestInspectorGetQueueInfo(t *testing.T) {
Completed: 0, Completed: 0,
Processed: 120, Processed: 120,
Failed: 2, Failed: 2,
ProcessedTotal: 11111,
FailedTotal: 111,
Paused: false, Paused: false,
Timestamp: now, Timestamp: now,
}, },
@ -363,12 +377,16 @@ func TestInspectorGetQueueInfo(t *testing.T) {
h.SeedAllCompletedQueues(t, r, tc.completed) h.SeedAllCompletedQueues(t, r, tc.completed)
ctx := context.Background() ctx := context.Background()
for qname, n := range tc.processed { for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now) r.Set(ctx, base.ProcessedKey(qname, now), n, 0)
r.Set(ctx, processedKey, n, 0)
} }
for qname, n := range tc.failed { for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now) r.Set(ctx, base.FailedKey(qname, now), n, 0)
r.Set(ctx, failedKey, n, 0) }
for qname, n := range tc.processedTotal {
r.Set(ctx, base.ProcessedTotalKey(qname), n, 0)
}
for qname, n := range tc.failedTotal {
r.Set(ctx, base.FailedTotalKey(qname), n, 0)
} }
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() { if enqueueTime.IsZero() {

View File

@ -39,6 +39,12 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel CancelChannel = "asynq:cancel" // PubSub channel
) )
// Max value for int64.
//
// Use this value to check if a redis counter value reached maximum.
// As documeted in https://redis.io/commands/INCR, a string stored at a redis key is interpreted as a base-10 64 bit signed integer.
const MaxInt64 = 1<<63 - 1
// TaskState denotes the state of a task. // TaskState denotes the state of a task.
type TaskState int type TaskState int
@ -150,6 +156,16 @@ func PausedKey(qname string) string {
return fmt.Sprintf("%spaused", QueueKeyPrefix(qname)) return fmt.Sprintf("%spaused", QueueKeyPrefix(qname))
} }
// ProcessedTotalKey returns a redis key for total processed count for the given queue.
func ProcessedTotalKey(qname string) string {
return fmt.Sprintf("%sprocessed", QueueKeyPrefix(qname))
}
// FailedTotalKey returns a redis key for total failure count for the given queue.
func FailedTotalKey(qname string) string {
return fmt.Sprintf("%sfailed", QueueKeyPrefix(qname))
}
// ProcessedKey returns a redis key for processed count for the given day for the queue. // ProcessedKey returns a redis key for processed count for the given day for the queue.
func ProcessedKey(qname string, t time.Time) string { func ProcessedKey(qname string, t time.Time) string {
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))

View File

@ -173,6 +173,40 @@ func TestPausedKey(t *testing.T) {
} }
} }
func TestProcessedTotalKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"default", "asynq:{default}:processed"},
{"custom", "asynq:{custom}:processed"},
}
for _, tc := range tests {
got := ProcessedTotalKey(tc.qname)
if got != tc.want {
t.Errorf("ProcessedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}
func TestFailedTotalKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"default", "asynq:{default}:failed"},
{"custom", "asynq:{custom}:failed"},
}
for _, tc := range tests {
got := FailedTotalKey(tc.qname)
if got != tc.want {
t.Errorf("FailedTotalKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}
func TestProcessedKey(t *testing.T) { func TestProcessedKey(t *testing.T) {
tests := []struct { tests := []struct {
qname string qname string

View File

@ -41,11 +41,18 @@ type Stats struct {
Retry int Retry int
Archived int Archived int
Completed int Completed int
// Total number of tasks processed during the current date.
// Number of tasks processed within the current date.
// The number includes both succeeded and failed tasks. // The number includes both succeeded and failed tasks.
Processed int Processed int
// Total number of tasks failed during the current date. // Number of tasks failed within the current date.
Failed int Failed int
// Total number of tasks processed (both succeeded and failed) from this queue.
ProcessedTotal int
// Total number of tasks failed.
FailedTotal int
// Latency of the queue, measured by the oldest pending task in the queue. // Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration Latency time.Duration
// Time this stats was taken. // Time this stats was taken.
@ -73,7 +80,9 @@ type DailyStats struct {
// KEYS[6] -> asynq:<qname>:completed // KEYS[6] -> asynq:<qname>:completed
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd> // KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd> // KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:paused // KEYS[9] -> asynq:<qname>:processed
// KEYS[10] -> asynq:<qname>:failed
// KEYS[11] -> asynq:<qname>:paused
// //
// ARGV[1] -> task key prefix // ARGV[1] -> task key prefix
var currentStatsCmd = redis.NewScript(` var currentStatsCmd = redis.NewScript(`
@ -91,22 +100,17 @@ table.insert(res, KEYS[5])
table.insert(res, redis.call("ZCARD", KEYS[5])) table.insert(res, redis.call("ZCARD", KEYS[5]))
table.insert(res, KEYS[6]) table.insert(res, KEYS[6])
table.insert(res, redis.call("ZCARD", KEYS[6])) table.insert(res, redis.call("ZCARD", KEYS[6]))
local pcount = 0 for i=7,10 do
local p = redis.call("GET", KEYS[7]) local count = 0
if p then local n = redis.call("GET", KEYS[i])
pcount = tonumber(p) if n then
count = tonumber(n)
end end
table.insert(res, KEYS[7]) table.insert(res, KEYS[i])
table.insert(res, pcount) table.insert(res, count)
local fcount = 0
local f = redis.call("GET", KEYS[8])
if f then
fcount = tonumber(f)
end end
table.insert(res, KEYS[8]) table.insert(res, KEYS[11])
table.insert(res, fcount) table.insert(res, redis.call("EXISTS", KEYS[11]))
table.insert(res, KEYS[9])
table.insert(res, redis.call("EXISTS", KEYS[9]))
table.insert(res, "oldest_pending_since") table.insert(res, "oldest_pending_since")
if pendingTaskCount > 0 then if pendingTaskCount > 0 then
local id = redis.call("LRANGE", KEYS[1], -1, -1)[1] local id = redis.call("LRANGE", KEYS[1], -1, -1)[1]
@ -136,6 +140,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.CompletedKey(qname), base.CompletedKey(qname),
base.ProcessedKey(qname, now), base.ProcessedKey(qname, now),
base.FailedKey(qname, now), base.FailedKey(qname, now),
base.ProcessedTotalKey(qname),
base.FailedTotalKey(qname),
base.PausedKey(qname), base.PausedKey(qname),
}, base.TaskKeyPrefix(qname)).Result() }, base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
@ -176,6 +182,10 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
stats.Processed = val stats.Processed = val
case base.FailedKey(qname, now): case base.FailedKey(qname, now):
stats.Failed = val stats.Failed = val
case base.ProcessedTotalKey(qname):
stats.ProcessedTotal = val
case base.FailedTotalKey(qname):
stats.FailedTotal = val
case base.PausedKey(qname): case base.PausedKey(qname):
if val == 0 { if val == 0 {
stats.Paused = false stats.Paused = false

View File

@ -72,6 +72,8 @@ func TestCurrentStats(t *testing.T) {
completed map[string][]base.Z completed map[string][]base.Z
processed map[string]int processed map[string]int
failed map[string]int failed map[string]int
processedTotal map[string]int
failedTotal map[string]int
paused []string paused []string
oldestPendingMessageEnqueueTime map[string]time.Time oldestPendingMessageEnqueueTime map[string]time.Time
qname string qname string
@ -121,6 +123,16 @@ func TestCurrentStats(t *testing.T) {
"critical": 0, "critical": 0,
"low": 1, "low": 1,
}, },
processedTotal: map[string]int{
"default": 11111,
"critical": 22222,
"low": 33333,
},
failedTotal: map[string]int{
"default": 111,
"critical": 222,
"low": 333,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{ oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second), "default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond), "critical": now.Add(-200 * time.Millisecond),
@ -140,6 +152,8 @@ func TestCurrentStats(t *testing.T) {
Completed: 0, Completed: 0,
Processed: 120, Processed: 120,
Failed: 2, Failed: 2,
ProcessedTotal: 11111,
FailedTotal: 111,
Latency: 15 * time.Second, Latency: 15 * time.Second,
Timestamp: now, Timestamp: now,
}, },
@ -188,6 +202,16 @@ func TestCurrentStats(t *testing.T) {
"critical": 0, "critical": 0,
"low": 1, "low": 1,
}, },
processedTotal: map[string]int{
"default": 11111,
"critical": 22222,
"low": 33333,
},
failedTotal: map[string]int{
"default": 111,
"critical": 222,
"low": 333,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{ oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second), "default": now.Add(-15 * time.Second),
"critical": time.Time{}, // zero value since there's no pending task in this queue "critical": time.Time{}, // zero value since there's no pending task in this queue
@ -207,6 +231,8 @@ func TestCurrentStats(t *testing.T) {
Completed: 0, Completed: 0,
Processed: 100, Processed: 100,
Failed: 0, Failed: 0,
ProcessedTotal: 22222,
FailedTotal: 222,
Latency: 0, Latency: 0,
Timestamp: now, Timestamp: now,
}, },
@ -228,12 +254,16 @@ func TestCurrentStats(t *testing.T) {
h.SeedAllCompletedQueues(t, r.client, tc.completed) h.SeedAllCompletedQueues(t, r.client, tc.completed)
ctx := context.Background() ctx := context.Background()
for qname, n := range tc.processed { for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now) r.client.Set(ctx, base.ProcessedKey(qname, now), n, 0)
r.client.Set(ctx, processedKey, n, 0)
} }
for qname, n := range tc.failed { for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now) r.client.Set(ctx, base.FailedKey(qname, now), n, 0)
r.client.Set(ctx, failedKey, n, 0) }
for qname, n := range tc.processedTotal {
r.client.Set(ctx, base.ProcessedTotalKey(qname), n, 0)
}
for qname, n := range tc.failedTotal {
r.client.Set(ctx, base.FailedTotalKey(qname), n, 0)
} }
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime { for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() { if enqueueTime.IsZero() {

View File

@ -303,8 +303,10 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:t:<task_id> // KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:processed
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
var doneCmd = redis.NewScript(` var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -319,6 +321,12 @@ local n = redis.call("INCR", KEYS[4])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[2]) redis.call("EXPIREAT", KEYS[4], ARGV[2])
end end
local total = redis.call("GET", KEYS[5])
if tonumber(total) == tonumber(ARGV[3]) then
redis.call("SET", KEYS[5], 1)
else
redis.call("INCR", KEYS[5])
end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
@ -326,9 +334,11 @@ return redis.status_reply("OK")
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:t:<task_id> // KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> unique key // KEYS[5] -> asynq:{<qname>}:processed
// KEYS[6] -> unique key
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> max int64 value
var doneUniqueCmd = redis.NewScript(` var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -343,8 +353,14 @@ local n = redis.call("INCR", KEYS[4])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[2]) redis.call("EXPIREAT", KEYS[4], ARGV[2])
end end
if redis.call("GET", KEYS[5]) == ARGV[1] then local total = redis.call("GET", KEYS[5])
redis.call("DEL", KEYS[5]) if tonumber(total) == tonumber(ARGV[3]) then
redis.call("SET", KEYS[5], 1)
else
redis.call("INCR", KEYS[5])
end
if redis.call("GET", KEYS[6]) == ARGV[1] then
redis.call("DEL", KEYS[6])
end end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
@ -361,10 +377,12 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
base.DeadlinesKey(msg.Queue), base.DeadlinesKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID), base.TaskKey(msg.Queue, msg.ID),
base.ProcessedKey(msg.Queue, now), base.ProcessedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,
expireAt.Unix(), expireAt.Unix(),
base.MaxInt64,
} }
// Note: We cannot pass empty unique key when running this script in redis-cluster. // Note: We cannot pass empty unique key when running this script in redis-cluster.
if len(msg.UniqueKey) > 0 { if len(msg.UniqueKey) > 0 {
@ -379,10 +397,13 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
// KEYS[3] -> asynq:{<qname>}:completed // KEYS[3] -> asynq:{<qname>}:completed
// KEYS[4] -> asynq:{<qname>}:t:<task_id> // KEYS[4] -> asynq:{<qname>}:t:<task_id>
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:processed
//
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task exipration time in unix time // ARGV[3] -> task exipration time in unix time
// ARGV[4] -> task message data // ARGV[4] -> task message data
// ARGV[5] -> max int64 value
var markAsCompleteCmd = redis.NewScript(` var markAsCompleteCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -398,6 +419,12 @@ local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[2]) redis.call("EXPIREAT", KEYS[5], ARGV[2])
end end
local total = redis.call("GET", KEYS[6])
if tonumber(total) == tonumber(ARGV[5]) then
redis.call("SET", KEYS[6], 1)
else
redis.call("INCR", KEYS[6])
end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
@ -406,11 +433,14 @@ return redis.status_reply("OK")
// KEYS[3] -> asynq:{<qname>}:completed // KEYS[3] -> asynq:{<qname>}:completed
// KEYS[4] -> asynq:{<qname>}:t:<task_id> // KEYS[4] -> asynq:{<qname>}:t:<task_id>
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:unique:{<checksum>} // KEYS[6] -> asynq:{<qname>}:processed
// KEYS[7] -> asynq:{<qname>}:unique:{<checksum>}
//
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task exipration time in unix time // ARGV[3] -> task exipration time in unix time
// ARGV[4] -> task message data // ARGV[4] -> task message data
// ARGV[5] -> max int64 value
var markAsCompleteUniqueCmd = redis.NewScript(` var markAsCompleteUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -426,8 +456,14 @@ local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[2]) redis.call("EXPIREAT", KEYS[5], ARGV[2])
end end
if redis.call("GET", KEYS[6]) == ARGV[1] then local total = redis.call("GET", KEYS[6])
redis.call("DEL", KEYS[6]) if tonumber(total) == tonumber(ARGV[5]) then
redis.call("SET", KEYS[6], 1)
else
redis.call("INCR", KEYS[6])
end
if redis.call("GET", KEYS[7]) == ARGV[1] then
redis.call("DEL", KEYS[7])
end end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
@ -450,12 +486,14 @@ func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error {
base.CompletedKey(msg.Queue), base.CompletedKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID), base.TaskKey(msg.Queue, msg.ID),
base.ProcessedKey(msg.Queue, now), base.ProcessedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,
statsExpireAt.Unix(), statsExpireAt.Unix(),
now.Unix() + msg.Retention, now.Unix() + msg.Retention,
encoded, encoded,
base.MaxInt64,
} }
// Note: We cannot pass empty unique key when running this script in redis-cluster. // Note: We cannot pass empty unique key when running this script in redis-cluster.
if len(msg.UniqueKey) > 0 { if len(msg.UniqueKey) > 0 {
@ -625,11 +663,15 @@ func (r *RDB) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, process
// KEYS[4] -> asynq:{<qname>}:retry // KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed
//
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value // ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> retry_at UNIX timestamp // ARGV[3] -> retry_at UNIX timestamp
// ARGV[4] -> stats expiration timestamp // ARGV[4] -> stats expiration timestamp
// ARGV[5] -> is_failure (bool) // ARGV[5] -> is_failure (bool)
// ARGV[6] -> max int64 value
var retryCmd = redis.NewScript(` var retryCmd = redis.NewScript(`
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -648,6 +690,14 @@ if tonumber(ARGV[5]) == 1 then
if tonumber(m) == 1 then if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[6], ARGV[4]) redis.call("EXPIREAT", KEYS[6], ARGV[4])
end end
local total = redis.call("GET", KEYS[7])
if tonumber(total) == tonumber(ARGV[6]) then
redis.call("SET", KEYS[7], 1)
redis.call("SET", KEYS[8], 1)
else
redis.call("INCR", KEYS[7])
redis.call("INCR", KEYS[8])
end
end end
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
@ -676,6 +726,8 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i
base.RetryKey(msg.Queue), base.RetryKey(msg.Queue),
base.ProcessedKey(msg.Queue, now), base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now), base.FailedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
base.FailedTotalKey(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,
@ -683,6 +735,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i
processAt.Unix(), processAt.Unix(),
expireAt.Unix(), expireAt.Unix(),
isFailure, isFailure,
base.MaxInt64,
} }
return r.runScript(ctx, op, retryCmd, keys, argv...) return r.runScript(ctx, op, retryCmd, keys, argv...)
} }
@ -698,12 +751,16 @@ const (
// KEYS[4] -> asynq:{<qname>}:archived // KEYS[4] -> asynq:{<qname>}:archived
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd> // KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// KEYS[7] -> asynq:{<qname>}:processed
// KEYS[8] -> asynq:{<qname>}:failed
//
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value // ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> died_at UNIX timestamp // ARGV[3] -> died_at UNIX timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[5] -> max number of tasks in archive (e.g., 100)
// ARGV[6] -> stats expiration timestamp // ARGV[6] -> stats expiration timestamp
// ARGV[7] -> max int64 value
var archiveCmd = redis.NewScript(` var archiveCmd = redis.NewScript(`
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -723,6 +780,14 @@ local m = redis.call("INCR", KEYS[6])
if tonumber(m) == 1 then if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[6], ARGV[6]) redis.call("EXPIREAT", KEYS[6], ARGV[6])
end end
local total = redis.call("GET", KEYS[7])
if tonumber(total) == tonumber(ARGV[7]) then
redis.call("SET", KEYS[7], 1)
redis.call("SET", KEYS[8], 1)
else
redis.call("INCR", KEYS[7])
redis.call("INCR", KEYS[8])
end
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Archive sends the given task to archive, attaching the error message to the task. // Archive sends the given task to archive, attaching the error message to the task.
@ -747,6 +812,8 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
base.ArchivedKey(msg.Queue), base.ArchivedKey(msg.Queue),
base.ProcessedKey(msg.Queue, now), base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now), base.FailedKey(msg.Queue, now),
base.ProcessedTotalKey(msg.Queue),
base.FailedTotalKey(msg.Queue),
} }
argv := []interface{}{ argv := []interface{}{
msg.ID, msg.ID,
@ -755,6 +822,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
cutoff.Unix(), cutoff.Unix(),
maxArchiveSize, maxArchiveSize,
expireAt.Unix(), expireAt.Unix(),
base.MaxInt64,
} }
return r.runScript(ctx, op, archiveCmd, keys, argv...) return r.runScript(ctx, op, archiveCmd, keys, argv...)
} }

View File

@ -807,18 +807,59 @@ func TestDone(t *testing.T) {
if gotProcessed != "1" { if gotProcessed != "1" {
t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed) t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed)
} }
gotTTL := r.client.TTL(context.Background(), processedKey).Val() gotTTL := r.client.TTL(context.Background(), processedKey).Val()
if gotTTL > statsTTL { if gotTTL > statsTTL {
t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL) t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL)
} }
processedTotalKey := base.ProcessedTotalKey(tc.target.Queue)
gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val()
if gotProcessedTotal != "1" {
t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedTotalKey, gotProcessedTotal)
}
if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 { if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 {
t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey) t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey)
} }
} }
} }
// Make sure that processed_total counter wraps to 1 when reaching int64 max value.
func TestDoneWithMaxCounter(t *testing.T) {
r := setup(t)
defer r.Close()
msg := &base.TaskMessage{
ID: uuid.NewString(),
Type: "foo",
Payload: nil,
Timeout: 1800,
Deadline: 0,
Queue: "default",
}
z := base.Z{
Message: msg,
Score: time.Now().Add(5 * time.Minute).Unix(),
}
h.SeedDeadlines(t, r.client, []base.Z{z}, msg.Queue)
h.SeedActiveQueue(t, r.client, []*base.TaskMessage{msg}, msg.Queue)
processedTotalKey := base.ProcessedTotalKey(msg.Queue)
ctx := context.Background()
if err := r.client.Set(ctx, processedTotalKey, base.MaxInt64, 0).Err(); err != nil {
t.Fatalf("Redis command failed: SET %q %v", processedTotalKey, base.MaxInt64)
}
if err := r.Done(msg); err != nil {
t.Fatalf("RDB.Done failed: %v", err)
}
gotProcessedTotal := r.client.Get(ctx, processedTotalKey).Val()
if gotProcessedTotal != "1" {
t.Errorf("GET %q = %v, want 1", processedTotalKey, gotProcessedTotal)
}
}
func TestMarkAsComplete(t *testing.T) { func TestMarkAsComplete(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
@ -1573,6 +1614,18 @@ func TestRetry(t *testing.T) {
if gotTTL > statsTTL { if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL) t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL)
} }
processedTotalKey := base.ProcessedTotalKey(tc.msg.Queue)
gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val()
if gotProcessedTotal != "1" {
t.Errorf("GET %q = %q, want 1", processedTotalKey, gotProcessedTotal)
}
failedTotalKey := base.FailedTotalKey(tc.msg.Queue)
gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val()
if gotFailedTotal != "1" {
t.Errorf("GET %q = %q, want 1", failedTotalKey, gotFailedTotal)
}
} }
} }
@ -1740,6 +1793,18 @@ func TestRetryWithNonFailureError(t *testing.T) {
if gotFailed != "" { if gotFailed != "" {
t.Errorf("GET %q = %q, want empty", failedKey, gotFailed) t.Errorf("GET %q = %q, want empty", failedKey, gotFailed)
} }
processedTotalKey := base.ProcessedTotalKey(tc.msg.Queue)
gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val()
if gotProcessedTotal != "" {
t.Errorf("GET %q = %q, want empty", processedTotalKey, gotProcessedTotal)
}
failedTotalKey := base.FailedTotalKey(tc.msg.Queue)
gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val()
if gotFailedTotal != "" {
t.Errorf("GET %q = %q, want empty", failedTotalKey, gotFailedTotal)
}
} }
} }
@ -1950,6 +2015,18 @@ func TestArchive(t *testing.T) {
if gotTTL > statsTTL { if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL) t.Errorf("TTL %q = %v, want less than or equal to %v", failedKey, gotTTL, statsTTL)
} }
processedTotalKey := base.ProcessedTotalKey(tc.target.Queue)
gotProcessedTotal := r.client.Get(context.Background(), processedTotalKey).Val()
if gotProcessedTotal != "1" {
t.Errorf("GET %q = %q, want 1", processedTotalKey, gotProcessedTotal)
}
failedTotalKey := base.FailedTotalKey(tc.target.Queue)
gotFailedTotal := r.client.Get(context.Background(), failedTotalKey).Val()
if gotFailedTotal != "1" {
t.Errorf("GET %q = %q, want 1", failedTotalKey, gotFailedTotal)
}
} }
} }

View File

@ -64,6 +64,18 @@ var (
[]string{"queue"}, nil, []string{"queue"}, nil,
) )
tasksProcessedTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "tasks_processed_total"),
"Number of tasks processed (both succeeded and failed); broken down by queue",
[]string{"queue"}, nil,
)
tasksFailedTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "tasks_failed_total"),
"Number of tasks failed; broken down by queue",
[]string{"queue"}, nil,
)
pausedQueues = prometheus.NewDesc( pausedQueues = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "queue_paused_total"), prometheus.BuildFQName(namespace, "", "queue_paused_total"),
"Number of queues paused", "Number of queues paused",
@ -145,6 +157,20 @@ func (qmc *QueueMetricsCollector) Collect(ch chan<- prometheus.Metric) {
info.Queue, info.Queue,
) )
ch <- prometheus.MustNewConstMetric(
tasksProcessedTotalDesc,
prometheus.CounterValue,
float64(info.ProcessedTotal),
info.Queue,
)
ch <- prometheus.MustNewConstMetric(
tasksFailedTotalDesc,
prometheus.CounterValue,
float64(info.FailedTotal),
info.Queue,
)
pausedValue := 0 // zero to indicate "not paused" pausedValue := 0 // zero to indicate "not paused"
if info.Paused { if info.Paused {
pausedValue = 1 pausedValue = 1