diff --git a/internal/base/base.go b/internal/base/base.go index 679849e..5339976 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -1,11 +1,17 @@ // Package base defines foundational types and constants used in asynq package. package base -import "github.com/rs/xid" +import ( + "time" + + "github.com/rs/xid" +) // Redis keys const ( - QueuePrefix = "asynq:queues:" // LIST - asynq:queues: + processedPrefix = "asynq:processed:" // STRING - asynq:processed: + failurePrefix = "asynq:failure:" // STRING - asynq:failure: + QueuePrefix = "asynq:queues:" // LIST - asynq:queues: DefaultQueue = QueuePrefix + "default" // LIST ScheduledQueue = "asynq:scheduled" // ZSET RetryQueue = "asynq:retry" // ZSET @@ -13,6 +19,18 @@ const ( InProgressQueue = "asynq:in_progress" // LIST ) +// ProcessedKey returns a redis key string for procesed count +// for the given day. +func ProcessedKey(t time.Time) string { + return processedPrefix + t.UTC().Format("2006-01-02") +} + +// FailureKey returns a redis key string for failure count +// for the given day. +func FailureKey(t time.Time) string { + return failurePrefix + t.UTC().Format("2006-01-02") +} + // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written in redis. type TaskMessage struct { diff --git a/internal/base/base_test.go b/internal/base/base_test.go new file mode 100644 index 0000000..2f375cc --- /dev/null +++ b/internal/base/base_test.go @@ -0,0 +1,42 @@ +package base + +import ( + "testing" + "time" +) + +func TestProcessedKey(t *testing.T) { + tests := []struct { + input time.Time + want string + }{ + {time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:processed:2019-11-14"}, + {time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:processed:2020-12-01"}, + {time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:processed:2020-01-06"}, + } + + for _, tc := range tests { + got := ProcessedKey(tc.input) + if got != tc.want { + t.Errorf("ProcessedKey(%v) = %q, want %q", tc.input, got, tc.want) + } + } +} + +func TestFailureKey(t *testing.T) { + tests := []struct { + input time.Time + want string + }{ + {time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:failure:2019-11-14"}, + {time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:failure:2020-12-01"}, + {time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:failure:2020-01-06"}, + } + + for _, tc := range tests { + got := FailureKey(tc.input) + if got != tc.want { + t.Errorf("FailureKey(%v) = %q, want %q", tc.input, got, tc.want) + } + } +} diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ef89700..12834d7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -19,6 +19,8 @@ var ( ErrTaskNotFound = errors.New("could not find a task") ) +const statsTTL = 90 * 24 * time.Hour // 90 days + // RDB is a client interface to query and mutate task queues. type RDB struct { client *redis.Client @@ -76,12 +78,26 @@ func (r *RDB) Done(msg *base.TaskMessage) error { if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } - // NOTE: count ZERO means "remove all elements equal to val" - err = r.client.LRem(base.InProgressQueue, 0, string(bytes)).Err() - if err != nil { - return fmt.Errorf("command `LREM %s 0 %s` failed: %v", base.InProgressQueue, string(bytes), err) - } - return nil + // Note: LREM count ZERO means "remove all elements equal to val" + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:processed: + // ARGV[1] -> base.TaskMessage value + // ARGV[2] -> stats expiration timestamp + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + local n = redis.call("INCR", KEYS[2]) + if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[2], ARGV[2]) + end + return redis.status_reply("OK") + `) + now := time.Now() + processedKey := base.ProcessedKey(now) + expireAt := now.Add(statsTTL) + _, err = script.Run(r.client, + []string{base.InProgressQueue, processedKey}, + string(bytes), expireAt.Unix()).Result() + return err } // Requeue moves the task from in-progress queue to the default @@ -132,18 +148,34 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", modified, err) } - // KEYS[1] -> asynq:in_progress - // KEYS[2] -> asynq:retry - // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue - // ARGV[2] -> base.TaskMessage value to add to Retry queue - // ARGV[3] -> retry_at UNIX timestamp + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:retry + // KEYS[3] -> asynq:processed: + // KEYS[4] -> asynq:failure: + // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue + // ARGV[2] -> base.TaskMessage value to add to Retry queue + // ARGV[3] -> retry_at UNIX timestamp + // ARGV[4] -> stats expiration timestamp script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) + local n = redis.call("INCR", KEYS[3]) + if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[4]) + end + local m = redis.call("INCR", KEYS[4]) + if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[4], ARGV[4]) + end return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{base.InProgressQueue, base.RetryQueue}, - string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result() + now := time.Now() + processedKey := base.ProcessedKey(now) + failureKey := base.FailureKey(now) + expireAt := now.Add(statsTTL) + _, err = script.Run(r.client, + []string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey}, + string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Result() return err } @@ -165,22 +197,37 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { } now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + processedKey := base.ProcessedKey(now) + failureKey := base.FailureKey(now) + expireAt := now.Add(statsTTL) // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:dead + // KEYS[3] -> asynq:processed: + // KEYS[4] -> asynq.failure: // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue // ARGV[2] -> base.TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in dead queue (e.g., 100) + // ARGV[6] -> stats expiration timestamp script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) + local n = redis.call("INCR", KEYS[3]) + if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[6]) + end + local m = redis.call("INCR", KEYS[4]) + if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[4], ARGV[6]) + end return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue}, - string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result() + _, err = script.Run(r.client, + []string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey}, + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask, expireAt.Unix()).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f263104..586942a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -110,6 +110,17 @@ func TestDone(t *testing.T) { t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", base.InProgressQueue, diff) continue } + + processedKey := base.ProcessedKey(time.Now()) + gotProcessed := r.client.Get(processedKey).Val() + if gotProcessed != "1" { + t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) + } + + gotTTL := r.client.TTL(processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) + } } } @@ -244,6 +255,26 @@ func TestKill(t *testing.T) { if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) } + + processedKey := base.ProcessedKey(time.Now()) + gotProcessed := r.client.Get(processedKey).Val() + if gotProcessed != "1" { + t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) + } + gotTTL := r.client.TTL(processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) + } + + failureKey := base.FailureKey(time.Now()) + gotFailure := r.client.Get(failureKey).Val() + if gotFailure != "1" { + t.Errorf("GET %q = %q, want 1", failureKey, gotFailure) + } + gotTTL = r.client.TTL(processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL) + } } } @@ -489,5 +520,25 @@ func TestRetry(t *testing.T) { if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } + + processedKey := base.ProcessedKey(time.Now()) + gotProcessed := r.client.Get(processedKey).Val() + if gotProcessed != "1" { + t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) + } + gotTTL := r.client.TTL(processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) + } + + failureKey := base.FailureKey(time.Now()) + gotFailure := r.client.Get(failureKey).Val() + if gotFailure != "1" { + t.Errorf("GET %q = %q, want 1", failureKey, gotFailure) + } + gotTTL = r.client.TTL(processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL) + } } }