From f27a72c2f26768c640c8f0d4ef94867265e66afb Mon Sep 17 00:00:00 2001 From: youngxu Date: Wed, 13 Jul 2022 14:48:04 +0800 Subject: [PATCH] chore(): set scheduler_history TTL --- internal/rdb/rdb.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 7b25f15..964a081 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -13,10 +13,11 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" + "github.com/spf13/cast" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/timeutil" - "github.com/spf13/cast" ) const statsTTL = 90 * 24 * time.Hour // 90 days @@ -1471,14 +1472,18 @@ func (r *RDB) PublishCancelation(id string) error { // ARGV[1] -> enqueued_at timestamp // ARGV[2] -> serialized SchedulerEnqueueEvent data // ARGV[3] -> max number of events to be persisted +// ARGV[4] -> history expiration timestamp var recordSchedulerEnqueueEventCmd = redis.NewScript(` redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3]) redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) +redis.call("EXPIREAT", KEYS[1], ARGV[4]) return redis.status_reply("OK")`) // Maximum number of enqueue events to store per entry. const maxEvents = 1000 +const schedulerHistoryExpirationInDays = 90 + // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { var op errors.Op = "rdb.RecordSchedulerEnqueueEvent" @@ -1487,6 +1492,8 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode scheduler enqueue event: %v", err)) } + now := r.clock.Now() + expireAt := now.AddDate(0, 0, schedulerHistoryExpirationInDays) keys := []string{ base.SchedulerHistoryKey(entryID), } @@ -1494,6 +1501,7 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE event.EnqueuedAt.Unix(), data, maxEvents, + expireAt.Unix(), } return r.runScript(ctx, op, recordSchedulerEnqueueEventCmd, keys, argv...) }