diff --git a/CHANGELOG.md b/CHANGELOG.md index 69414c3..c167e8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.25.0] - 2023-01-02 + +### Added +- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715) + ## [0.24.1] - 2023-05-01 ### Changed diff --git a/internal/base/base.go b/internal/base/base.go index a63c548..c0eb5bb 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -737,7 +737,7 @@ type Broker interface { ReclaimStaleAggregationSets(qname string) error // Task retention related method - DeleteExpiredCompletedTasks(qname string) error + DeleteExpiredCompletedTasks(qname string, batchSize int) error // Lease related methods ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 68de386..f856dc6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1241,9 +1241,7 @@ return table.getn(ids)`) // DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, // and delete all expired tasks. -func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { - // Note: Do this operation in fix batches to prevent long running script. - const batchSize = 100 +func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error { for { n, err := r.deleteExpiredCompletedTasks(qname, batchSize) if err != nil { diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3bd9eda..9df831f 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2542,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) { h.FlushDB(t, r.client) h.SeedAllCompletedQueues(t, r.client, tc.completed) - if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil { - t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err) + if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil { + t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err) continue } diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 368ae3e..ffab6fe 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" + "github.com/redis/go-redis/v9" ) var errRedisDown = errors.New("testutil: redis is down") @@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error { return tb.real.ForwardIfReady(qnames...) } -func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error { +func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return errRedisDown } - return tb.real.DeleteExpiredCompletedTasks(qname) + return tb.real.DeleteExpiredCompletedTasks(qname, batchSize) } func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { diff --git a/janitor.go b/janitor.go index 40f5ea1..612dbf0 100644 --- a/janitor.go +++ b/janitor.go @@ -27,13 +27,17 @@ type janitor struct { // average interval between checks. avgInterval time.Duration + + // number of tasks to be deleted when janitor runs to delete the expired completed tasks. + batchSize int } type janitorParams struct { - logger *log.Logger - broker base.Broker - queues []string - interval time.Duration + logger *log.Logger + broker base.Broker + queues []string + interval time.Duration + batchSize int } func newJanitor(params janitorParams) *janitor { @@ -43,6 +47,7 @@ func newJanitor(params janitorParams) *janitor { done: make(chan struct{}), queues: params.queues, avgInterval: params.interval, + batchSize: params.batchSize, } } @@ -73,7 +78,7 @@ func (j *janitor) start(wg *sync.WaitGroup) { func (j *janitor) exec() { for _, qname := range j.queues { - if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil { + if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil { j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v", qname, err) } diff --git a/janitor_test.go b/janitor_test.go index 7f61278..a8bd02a 100644 --- a/janitor_test.go +++ b/janitor_test.go @@ -26,11 +26,13 @@ func TestJanitor(t *testing.T) { defer r.Close() rdbClient := rdb.NewRDB(r) const interval = 1 * time.Second + const batchSize = 100 janitor := newJanitor(janitorParams{ - logger: testLogger, - broker: rdbClient, - queues: []string{"default", "custom"}, - interval: interval, + logger: testLogger, + broker: rdbClient, + queues: []string{"default", "custom"}, + interval: interval, + batchSize: batchSize, }) now := time.Now() diff --git a/server.go b/server.go index cc6f418..6211b8a 100644 --- a/server.go +++ b/server.go @@ -239,6 +239,17 @@ type Config struct { // // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregator GroupAggregator + + // JanitorInterval specifies the average interval of janitor checks for expired completed tasks. + // + // If unset or zero, default interval of 8 seconds is used. + JanitorInterval time.Duration + + // JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run. + // + // If unset or zero, default batch size of 100 is used. + // Make sure to not put a big number as the batch size to prevent a long-running script. + JanitorBatchSize int } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -408,6 +419,10 @@ const ( defaultDelayedTaskCheckInterval = 5 * time.Second defaultGroupGracePeriod = 1 * time.Minute + + defaultJanitorInterval = 8 * time.Second + + defaultJanitorBatchSize = 100 ) // NewServer returns a new Server given a redis connection option @@ -547,11 +562,26 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { interval: healthcheckInterval, healthcheckFunc: cfg.HealthCheckFunc, }) + + janitorInterval := cfg.JanitorInterval + if janitorInterval == 0 { + janitorInterval = defaultJanitorInterval + } + + janitorBatchSize := cfg.JanitorBatchSize + if janitorBatchSize == 0 { + janitorBatchSize = defaultJanitorBatchSize + } + if janitorBatchSize > defaultJanitorBatchSize { + logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+ + "This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize) + } janitor := newJanitor(janitorParams{ - logger: logger, - broker: rdb, - queues: qnames, - interval: 8 * time.Second, + logger: logger, + broker: rdb, + queues: qnames, + interval: janitorInterval, + batchSize: janitorBatchSize, }) aggregator := newAggregator(aggregatorParams{ logger: logger,