diff --git a/client.go b/client.go index 4a0ba39..055dc8c 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,40 @@ type Client struct { sharedConnection bool } +type ClientConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateClientConfig(cfg *ClientConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewClientWithConfig returns a new Client instance given a redis connection option. +func NewClientWithConfig(r RedisConnOpt, cfg ClientConfig) *Client { + validateClientConfig(&cfg) + + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) + } + return &Client{broker: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + })} +} + // NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) diff --git a/inspector.go b/inspector.go index e361d22..8222b8d 100644 --- a/inspector.go +++ b/inspector.go @@ -25,15 +25,45 @@ type Inspector struct { sharedConnection bool } -// New returns a new instance of Inspector. -func NewInspector(r RedisConnOpt) *Inspector { +type InspectorConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateInspectorConfig(cfg *InspectorConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewInspectorWithConfig returns a new instance of Inspector. +func NewInspectorWithConfig(r RedisConnOpt, cfg InspectorConfig) *Inspector { + validateInspectorConfig(&cfg) + c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } - inspector := NewInspectorFromRedisClient(c) - inspector.sharedConnection = false - return inspector + return &Inspector{ + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }), + } +} + +// NewInspector returns a new instance of Inspector. +func NewInspector(r RedisConnOpt) *Inspector { + return NewInspectorWithConfig(r, InspectorConfig{}) } // NewInspectorFromRedisClient returns a new instance of Inspector given a redis.UniversalClient diff --git a/internal/base/base.go b/internal/base/base.go index 505e1ba..12c11b2 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -31,6 +31,11 @@ const DefaultQueueName = "default" // DefaultQueue is the redis key for the default queue. var DefaultQueue = PendingKey(DefaultQueueName) +const ( + DefaultMaxArchiveSize = 10000 // maximum number of tasks in archive + DefaultArchivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently +) + // Global Redis keys. const ( AllServers = "asynq:servers" // ZSET diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a18c4e2..879a142 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1181,8 +1181,8 @@ func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), gname, } @@ -1237,8 +1237,8 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), } res, err := archiveAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -1328,8 +1328,8 @@ func (r *RDB) ArchiveTask(qname, id string) error { argv := []interface{}{ id, now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.QueueKeyPrefix(qname), base.GroupKeyPrefix(qname), } @@ -1393,8 +1393,8 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), qname, } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df506..5e48751 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -30,14 +30,42 @@ type RDB struct { client redis.UniversalClient clock timeutil.Clock queuesPublished sync.Map + config RDBConfig +} +type RDBConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateRDBConfig(cfg *RDBConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewRDBWithConfig returns a new instance of RDB. +func NewRDBWithConfig(client redis.UniversalClient, cfg RDBConfig) *RDB { + validateRDBConfig(&cfg) + + return &RDB{ + client: client, + clock: timeutil.NewRealClock(), + config: cfg, + } } // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { - return &RDB{ - client: client, - clock: timeutil.NewRealClock(), - } + return NewRDBWithConfig(client, RDBConfig{}) } // Close closes the connection with redis server. @@ -836,11 +864,6 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T return r.runScript(ctx, op, retryCmd, keys, argv...) } -const ( - maxArchiveSize = 10000 // maximum number of tasks in archive - archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently -) - // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:lease @@ -904,6 +927,10 @@ return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error { + if *(r.config.MaxArchiveSize) <= 0 { + return nil + } + var op errors.Op = "rdb.Archive" now := r.clock.Now() modified := *msg @@ -913,7 +940,8 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } - cutoff := now.AddDate(0, 0, -archivedExpirationInDays) + cutoff := now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)) + expireAt := now.Add(statsTTL) keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -931,7 +959,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) encoded, now.Unix(), cutoff.Unix(), - maxArchiveSize, + *r.config.MaxArchiveSize, expireAt.Unix(), int64(math.MaxInt64), } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5249a29..0cbb283 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2250,7 +2250,7 @@ func TestArchiveTrim(t *testing.T) { errMsg := "SMTP server not responding" maxArchiveSet := make([]base.Z, 0) - for i := 0; i < maxArchiveSize-1; i++ { + for i := 0; i < base.DefaultMaxArchiveSize-1; i++ { maxArchiveSet = append(maxArchiveSet, base.Z{Message: &base.TaskMessage{ ID: uuid.NewString(), Type: "generate_csv", @@ -2306,7 +2306,7 @@ func TestArchiveTrim(t *testing.T) { }, archived: map[string][]base.Z{ "default": { - {Message: t2, Score: now.Add(-time.Hour * 24 * (archivedExpirationInDays + 1)).Unix()}, + {Message: t2, Score: now.Add(-time.Hour * 24 * (base.DefaultArchivedExpirationInDays + 1)).Unix()}, }, }, wantArchived: map[string][]base.Z{ diff --git a/scheduler.go b/scheduler.go index 0026185..8c52cf2 100644 --- a/scheduler.go +++ b/scheduler.go @@ -94,14 +94,17 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * heartbeatInterval: heartbeatInterval, logger: logger, client: NewClientFromRedisClient(c), - rdb: rdb.NewRDB(c), - cron: cron.New(cron.WithLocation(loc)), - location: loc, - done: make(chan struct{}), - preEnqueueFunc: opts.PreEnqueueFunc, - postEnqueueFunc: opts.PostEnqueueFunc, - errHandler: opts.EnqueueErrorHandler, - idmap: make(map[string]cron.EntryID), + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: opts.MaxArchiveSize, + ArchivedExpirationInDays: opts.ArchivedExpirationInDays, + }), + cron: cron.New(cron.WithLocation(loc)), + location: loc, + done: make(chan struct{}), + preEnqueueFunc: opts.PreEnqueueFunc, + postEnqueueFunc: opts.PostEnqueueFunc, + errHandler: opts.EnqueueErrorHandler, + idmap: make(map[string]cron.EntryID), } } @@ -151,6 +154,16 @@ type SchedulerOpts struct { // EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task // due to an error. EnqueueErrorHandler func(task *Task, opts []Option, err error) + + // MaxArchiveSize specifies the maximum size of the archive that can be created by the server. + // + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // enqueueJob encapsulates the job of enqueuing a task and recording the event. diff --git a/server.go b/server.go index 0cc4f38..1928297 100644 --- a/server.go +++ b/server.go @@ -253,6 +253,14 @@ type Config struct { // If unset or zero, default batch size of 100 is used. // Make sure to not put a big number as the batch size to prevent a long-running script. JanitorBatchSize int + + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -427,9 +435,25 @@ const ( defaultJanitorBatchSize = 100 ) +func validateConfig(cfg *Config) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + // NewServer returns a new Server given a redis connection option // and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { + validateConfig(&cfg) redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) @@ -504,7 +528,10 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) - rdb := rdb.NewRDB(c) + rdb := rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest)