mirror of
https://github.com/hibiken/asynq.git
synced 2025-08-19 15:08:55 +08:00
Record processed and failure daily count
This commit is contained in:
@@ -19,6 +19,8 @@ var (
|
||||
ErrTaskNotFound = errors.New("could not find a task")
|
||||
)
|
||||
|
||||
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||
|
||||
// RDB is a client interface to query and mutate task queues.
|
||||
type RDB struct {
|
||||
client *redis.Client
|
||||
@@ -76,12 +78,26 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
// NOTE: count ZERO means "remove all elements equal to val"
|
||||
err = r.client.LRem(base.InProgressQueue, 0, string(bytes)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", base.InProgressQueue, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
// Note: LREM count ZERO means "remove all elements equal to val"
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value
|
||||
// ARGV[2] -> stats expiration timestamp
|
||||
script := redis.NewScript(`
|
||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
local n = redis.call("INCR", KEYS[2])
|
||||
if tonumber(n) == 1 then
|
||||
redis.call("EXPIREAT", KEYS[2], ARGV[2])
|
||||
end
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
now := time.Now()
|
||||
processedKey := base.ProcessedKey(now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
_, err = script.Run(r.client,
|
||||
[]string{base.InProgressQueue, processedKey},
|
||||
string(bytes), expireAt.Unix()).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// Requeue moves the task from in-progress queue to the default
|
||||
@@ -132,18 +148,34 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
||||
}
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:retry
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
||||
// ARGV[3] -> retry_at UNIX timestamp
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:retry
|
||||
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||
// KEYS[4] -> asynq:failure:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
||||
// ARGV[3] -> retry_at UNIX timestamp
|
||||
// ARGV[4] -> stats expiration timestamp
|
||||
script := redis.NewScript(`
|
||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||
local n = redis.call("INCR", KEYS[3])
|
||||
if tonumber(n) == 1 then
|
||||
redis.call("EXPIREAT", KEYS[3], ARGV[4])
|
||||
end
|
||||
local m = redis.call("INCR", KEYS[4])
|
||||
if tonumber(m) == 1 then
|
||||
redis.call("EXPIREAT", KEYS[4], ARGV[4])
|
||||
end
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
_, err = script.Run(r.client, []string{base.InProgressQueue, base.RetryQueue},
|
||||
string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result()
|
||||
now := time.Now()
|
||||
processedKey := base.ProcessedKey(now)
|
||||
failureKey := base.FailureKey(now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
_, err = script.Run(r.client,
|
||||
[]string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey},
|
||||
string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -165,22 +197,37 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||
}
|
||||
now := time.Now()
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
processedKey := base.ProcessedKey(now)
|
||||
failureKey := base.FailureKey(now)
|
||||
expireAt := now.Add(statsTTL)
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:dead
|
||||
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||
// KEYS[4] -> asynq.failure:<yyyy-mm-dd>
|
||||
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
||||
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
||||
// ARGV[3] -> died_at UNIX timestamp
|
||||
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||
// ARGV[6] -> stats expiration timestamp
|
||||
script := redis.NewScript(`
|
||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
||||
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
||||
local n = redis.call("INCR", KEYS[3])
|
||||
if tonumber(n) == 1 then
|
||||
redis.call("EXPIREAT", KEYS[3], ARGV[6])
|
||||
end
|
||||
local m = redis.call("INCR", KEYS[4])
|
||||
if tonumber(m) == 1 then
|
||||
redis.call("EXPIREAT", KEYS[4], ARGV[6])
|
||||
end
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
_, err = script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue},
|
||||
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result()
|
||||
_, err = script.Run(r.client,
|
||||
[]string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey},
|
||||
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask, expireAt.Unix()).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@@ -110,6 +110,17 @@ func TestDone(t *testing.T) {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", base.InProgressQueue, diff)
|
||||
continue
|
||||
}
|
||||
|
||||
processedKey := base.ProcessedKey(time.Now())
|
||||
gotProcessed := r.client.Get(processedKey).Val()
|
||||
if gotProcessed != "1" {
|
||||
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||
}
|
||||
|
||||
gotTTL := r.client.TTL(processedKey).Val()
|
||||
if gotTTL > statsTTL {
|
||||
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,6 +255,26 @@ func TestKill(t *testing.T) {
|
||||
if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
|
||||
}
|
||||
|
||||
processedKey := base.ProcessedKey(time.Now())
|
||||
gotProcessed := r.client.Get(processedKey).Val()
|
||||
if gotProcessed != "1" {
|
||||
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||
}
|
||||
gotTTL := r.client.TTL(processedKey).Val()
|
||||
if gotTTL > statsTTL {
|
||||
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||
}
|
||||
|
||||
failureKey := base.FailureKey(time.Now())
|
||||
gotFailure := r.client.Get(failureKey).Val()
|
||||
if gotFailure != "1" {
|
||||
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
|
||||
}
|
||||
gotTTL = r.client.TTL(processedKey).Val()
|
||||
if gotTTL > statsTTL {
|
||||
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -489,5 +520,25 @@ func TestRetry(t *testing.T) {
|
||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
|
||||
}
|
||||
|
||||
processedKey := base.ProcessedKey(time.Now())
|
||||
gotProcessed := r.client.Get(processedKey).Val()
|
||||
if gotProcessed != "1" {
|
||||
t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed)
|
||||
}
|
||||
gotTTL := r.client.TTL(processedKey).Val()
|
||||
if gotTTL > statsTTL {
|
||||
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
|
||||
}
|
||||
|
||||
failureKey := base.FailureKey(time.Now())
|
||||
gotFailure := r.client.Get(failureKey).Val()
|
||||
if gotFailure != "1" {
|
||||
t.Errorf("GET %q = %q, want 1", failureKey, gotFailure)
|
||||
}
|
||||
gotTTL = r.client.TTL(processedKey).Val()
|
||||
if gotTTL > statsTTL {
|
||||
t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user