From 9261e4318cae9550a88325115a884882762756d9 Mon Sep 17 00:00:00 2001 From: fajrifernanda Date: Wed, 4 Dec 2024 13:23:04 +0700 Subject: [PATCH 1/4] feature: configurable archive ttl and max archived size --- client.go | 34 +++++++++++++++++++++++++++ inspector.go | 40 ++++++++++++++++++++++++++++---- internal/base/base.go | 5 ++++ internal/rdb/inspect.go | 16 ++++++------- internal/rdb/rdb.go | 50 +++++++++++++++++++++++++++++++--------- internal/rdb/rdb_test.go | 4 ++-- scheduler.go | 29 ++++++++++++++++------- server.go | 29 ++++++++++++++++++++++- 8 files changed, 172 insertions(+), 35 deletions(-) diff --git a/client.go b/client.go index 4a0ba39..055dc8c 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,40 @@ type Client struct { sharedConnection bool } +type ClientConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateClientConfig(cfg *ClientConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewClientWithConfig returns a new Client instance given a redis connection option. +func NewClientWithConfig(r RedisConnOpt, cfg ClientConfig) *Client { + validateClientConfig(&cfg) + + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) + } + return &Client{broker: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + })} +} + // NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) diff --git a/inspector.go b/inspector.go index e361d22..8222b8d 100644 --- a/inspector.go +++ b/inspector.go @@ -25,15 +25,45 @@ type Inspector struct { sharedConnection bool } -// New returns a new instance of Inspector. -func NewInspector(r RedisConnOpt) *Inspector { +type InspectorConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateInspectorConfig(cfg *InspectorConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewInspectorWithConfig returns a new instance of Inspector. +func NewInspectorWithConfig(r RedisConnOpt, cfg InspectorConfig) *Inspector { + validateInspectorConfig(&cfg) + c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } - inspector := NewInspectorFromRedisClient(c) - inspector.sharedConnection = false - return inspector + return &Inspector{ + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }), + } +} + +// NewInspector returns a new instance of Inspector. +func NewInspector(r RedisConnOpt) *Inspector { + return NewInspectorWithConfig(r, InspectorConfig{}) } // NewInspectorFromRedisClient returns a new instance of Inspector given a redis.UniversalClient diff --git a/internal/base/base.go b/internal/base/base.go index 505e1ba..12c11b2 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -31,6 +31,11 @@ const DefaultQueueName = "default" // DefaultQueue is the redis key for the default queue. var DefaultQueue = PendingKey(DefaultQueueName) +const ( + DefaultMaxArchiveSize = 10000 // maximum number of tasks in archive + DefaultArchivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently +) + // Global Redis keys. const ( AllServers = "asynq:servers" // ZSET diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a18c4e2..879a142 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1181,8 +1181,8 @@ func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), gname, } @@ -1237,8 +1237,8 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), } res, err := archiveAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -1328,8 +1328,8 @@ func (r *RDB) ArchiveTask(qname, id string) error { argv := []interface{}{ id, now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.QueueKeyPrefix(qname), base.GroupKeyPrefix(qname), } @@ -1393,8 +1393,8 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), qname, } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df506..5e48751 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -30,14 +30,42 @@ type RDB struct { client redis.UniversalClient clock timeutil.Clock queuesPublished sync.Map + config RDBConfig +} +type RDBConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateRDBConfig(cfg *RDBConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewRDBWithConfig returns a new instance of RDB. +func NewRDBWithConfig(client redis.UniversalClient, cfg RDBConfig) *RDB { + validateRDBConfig(&cfg) + + return &RDB{ + client: client, + clock: timeutil.NewRealClock(), + config: cfg, + } } // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { - return &RDB{ - client: client, - clock: timeutil.NewRealClock(), - } + return NewRDBWithConfig(client, RDBConfig{}) } // Close closes the connection with redis server. @@ -836,11 +864,6 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T return r.runScript(ctx, op, retryCmd, keys, argv...) } -const ( - maxArchiveSize = 10000 // maximum number of tasks in archive - archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently -) - // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:lease @@ -904,6 +927,10 @@ return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error { + if *(r.config.MaxArchiveSize) <= 0 { + return nil + } + var op errors.Op = "rdb.Archive" now := r.clock.Now() modified := *msg @@ -913,7 +940,8 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } - cutoff := now.AddDate(0, 0, -archivedExpirationInDays) + cutoff := now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)) + expireAt := now.Add(statsTTL) keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -931,7 +959,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) encoded, now.Unix(), cutoff.Unix(), - maxArchiveSize, + *r.config.MaxArchiveSize, expireAt.Unix(), int64(math.MaxInt64), } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5249a29..0cbb283 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2250,7 +2250,7 @@ func TestArchiveTrim(t *testing.T) { errMsg := "SMTP server not responding" maxArchiveSet := make([]base.Z, 0) - for i := 0; i < maxArchiveSize-1; i++ { + for i := 0; i < base.DefaultMaxArchiveSize-1; i++ { maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{ ID: uuid.NewString(), Type: "generate_csv", @@ -2306,7 +2306,7 @@ func TestArchiveTrim(t *testing.T) { }, archived: map[string][]base.Z{ "default": { - {Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()}, + {Message: t2, Score: now.Add(-time.Hour * 24 * (base.DefaultArchivedExpirationInDays + 1)).Unix()}, }, }, wantArchived: map[string][]base.Z{ diff --git a/scheduler.go b/scheduler.go index 0026185..8c52cf2 100644 --- a/scheduler.go +++ b/scheduler.go @@ -94,14 +94,17 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * heartbeatInterval: heartbeatInterval, logger: logger, client: NewClientFromRedisClient(c), - rdb: rdb.NewRDB(c), - cron: cron.New(cron.WithLocation(loc)), - location: loc, - done: make(chan struct{}), - preEnqueueFunc: opts.PreEnqueueFunc, - postEnqueueFunc: opts.PostEnqueueFunc, - errHandler: opts.EnqueueErrorHandler, - idmap: make(map[string]cron.EntryID), + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: opts.MaxArchiveSize, + ArchivedExpirationInDays: opts.ArchivedExpirationInDays, + }), + cron: cron.New(cron.WithLocation(loc)), + location: loc, + done: make(chan struct{}), + preEnqueueFunc: opts.PreEnqueueFunc, + postEnqueueFunc: opts.PostEnqueueFunc, + errHandler: opts.EnqueueErrorHandler, + idmap: make(map[string]cron.EntryID), } } @@ -151,6 +154,16 @@ type SchedulerOpts struct { // EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task // due to an error. EnqueueErrorHandler func(task *Task, opts []Option, err error) + + // MaxArchiveSize specifies the maximum size of the archive that can be created by the server. + // + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // enqueueJob encapsulates the job of enqueuing a task and recording the event. diff --git a/server.go b/server.go index 0cc4f38..1928297 100644 --- a/server.go +++ b/server.go @@ -253,6 +253,14 @@ type Config struct { // If unset or zero, default batch size of 100 is used. // Make sure to not put a big number as the batch size to prevent a long-running script. JanitorBatchSize int + + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -427,9 +435,25 @@ const ( defaultJanitorBatchSize = 100 ) +func validateConfig(cfg *Config) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + // NewServer returns a new Server given a redis connection option // and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { + validateConfig(&cfg) redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) @@ -504,7 +528,10 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) - rdb := rdb.NewRDB(c) + rdb := rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) From b689bb42e3f88818c332c21b3711a93ce093e6e2 Mon Sep 17 00:00:00 2001 From: fajrifernanda Date: Wed, 4 Dec 2024 13:36:27 +0700 Subject: [PATCH 2/4] fix client --- client.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/client.go b/client.go index 055dc8c..dcfe3b8 100644 --- a/client.go +++ b/client.go @@ -58,21 +58,18 @@ func NewClientWithConfig(r RedisConnOpt, cfg ClientConfig) *Client { if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - return &Client{broker: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ - MaxArchiveSize: cfg.MaxArchiveSize, - ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, - })} + return &Client{ + broker: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }), + sharedConnection: false, + } } // NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { - redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) - if !ok { - panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) - } - client := NewClientFromRedisClient(redisClient) - client.sharedConnection = false - return client + return NewClientWithConfig(r, ClientConfig{}) } // NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient From 3a771f1117825f54e9800f46cde22f01d366cabc Mon Sep 17 00:00:00 2001 From: fajrifernanda Date: Wed, 4 Dec 2024 14:32:49 +0700 Subject: [PATCH 3/4] fix logic --- client.go | 4 ++++ inspector.go | 4 ++++ internal/rdb/rdb.go | 8 ++++---- scheduler.go | 2 +- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/client.go b/client.go index dcfe3b8..62e560a 100644 --- a/client.go +++ b/client.go @@ -40,6 +40,10 @@ func validateClientConfig(cfg *ClientConfig) { value := base.DefaultMaxArchiveSize cfg.MaxArchiveSize = &value } + if *(cfg.MaxArchiveSize) <= 0 { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } if cfg.ArchivedExpirationInDays == nil { value := base.DefaultArchivedExpirationInDays cfg.ArchivedExpirationInDays = &value diff --git a/inspector.go b/inspector.go index 8222b8d..7bc4c00 100644 --- a/inspector.go +++ b/inspector.go @@ -35,6 +35,10 @@ func validateInspectorConfig(cfg *InspectorConfig) { value := base.DefaultMaxArchiveSize cfg.MaxArchiveSize = &value } + if *(cfg.MaxArchiveSize) <= 0 { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } if cfg.ArchivedExpirationInDays == nil { value := base.DefaultArchivedExpirationInDays cfg.ArchivedExpirationInDays = &value diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5e48751..2a2850d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -42,6 +42,10 @@ func validateRDBConfig(cfg *RDBConfig) { value := base.DefaultMaxArchiveSize cfg.MaxArchiveSize = &value } + if *(cfg.MaxArchiveSize) <= 0 { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } if cfg.ArchivedExpirationInDays == nil { value := base.DefaultArchivedExpirationInDays cfg.ArchivedExpirationInDays = &value @@ -927,10 +931,6 @@ return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error { - if *(r.config.MaxArchiveSize) <= 0 { - return nil - } - var op errors.Op = "rdb.Archive" now := r.clock.Now() modified := *msg diff --git a/scheduler.go b/scheduler.go index 8c52cf2..005422b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -157,7 +157,7 @@ type SchedulerOpts struct { // MaxArchiveSize specifies the maximum size of the archive that can be created by the server. // - // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + // If unset or below 1, the DefaultMaxArchiveSize is used MaxArchiveSize *int // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. From 8213e981a540b01b0bd70d8b8e7ea70274f809fc Mon Sep 17 00:00:00 2001 From: fajrifernanda Date: Wed, 4 Dec 2024 14:34:11 +0700 Subject: [PATCH 4/4] fix logic --- server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 1928297..c22b970 100644 --- a/server.go +++ b/server.go @@ -254,7 +254,7 @@ type Config struct { // Make sure to not put a big number as the batch size to prevent a long-running script. JanitorBatchSize int - // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + // If unset or below 1, the DefaultMaxArchiveSize is used. MaxArchiveSize *int // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. @@ -440,6 +440,10 @@ func validateConfig(cfg *Config) { value := base.DefaultMaxArchiveSize cfg.MaxArchiveSize = &value } + if *(cfg.MaxArchiveSize) < 0 { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } if cfg.ArchivedExpirationInDays == nil { value := base.DefaultArchivedExpirationInDays cfg.ArchivedExpirationInDays = &value