2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

feature: configurable janitor interval and deletion batch size (#715)

* feature: configurable janitor interval and deletion batch size

* warn user when they set a big number of janitor batch size

* Update CHANGELOG.md

---------

Co-authored-by: Agung Hariadi Tedja <agung.tedja@kumparan.com>
This commit is contained in:
Tedja 2024-05-06 13:11:52 +07:00 committed by GitHub
parent 174008843d
commit d04888e748
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 62 additions and 22 deletions

View File

@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.25.0] - 2023-01-02
### Added
- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715)
## [0.24.1] - 2023-05-01 ## [0.24.1] - 2023-05-01
### Changed ### Changed

View File

@ -737,7 +737,7 @@ type Broker interface {
ReclaimStaleAggregationSets(qname string) error ReclaimStaleAggregationSets(qname string) error
// Task retention related method // Task retention related method
DeleteExpiredCompletedTasks(qname string) error DeleteExpiredCompletedTasks(qname string, batchSize int) error
// Lease related methods // Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)

View File

@ -1241,9 +1241,7 @@ return table.getn(ids)`)
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, // DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
// and delete all expired tasks. // and delete all expired tasks.
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
// Note: Do this operation in fix batches to prevent long running script.
const batchSize = 100
for { for {
n, err := r.deleteExpiredCompletedTasks(qname, batchSize) n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
if err != nil { if err != nil {

View File

@ -2542,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
h.SeedAllCompletedQueues(t, r.client, tc.completed) h.SeedAllCompletedQueues(t, r.client, tc.completed)
if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil { if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err) t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err)
continue continue
} }

View File

@ -11,8 +11,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
) )
var errRedisDown = errors.New("testutil: redis is down") var errRedisDown = errors.New("testutil: redis is down")
@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
return tb.real.ForwardIfReady(qnames...) return tb.real.ForwardIfReady(qnames...)
} }
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error { func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return errRedisDown return errRedisDown
} }
return tb.real.DeleteExpiredCompletedTasks(qname) return tb.real.DeleteExpiredCompletedTasks(qname, batchSize)
} }
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {

View File

@ -27,6 +27,9 @@ type janitor struct {
// average interval between checks. // average interval between checks.
avgInterval time.Duration avgInterval time.Duration
// number of tasks to be deleted when janitor runs to delete the expired completed tasks.
batchSize int
} }
type janitorParams struct { type janitorParams struct {
@ -34,6 +37,7 @@ type janitorParams struct {
broker base.Broker broker base.Broker
queues []string queues []string
interval time.Duration interval time.Duration
batchSize int
} }
func newJanitor(params janitorParams) *janitor { func newJanitor(params janitorParams) *janitor {
@ -43,6 +47,7 @@ func newJanitor(params janitorParams) *janitor {
done: make(chan struct{}), done: make(chan struct{}),
queues: params.queues, queues: params.queues,
avgInterval: params.interval, avgInterval: params.interval,
batchSize: params.batchSize,
} }
} }
@ -73,7 +78,7 @@ func (j *janitor) start(wg *sync.WaitGroup) {
func (j *janitor) exec() { func (j *janitor) exec() {
for _, qname := range j.queues { for _, qname := range j.queues {
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil { if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil {
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v", j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
qname, err) qname, err)
} }

View File

@ -26,11 +26,13 @@ func TestJanitor(t *testing.T) {
defer r.Close() defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
const interval = 1 * time.Second const interval = 1 * time.Second
const batchSize = 100
janitor := newJanitor(janitorParams{ janitor := newJanitor(janitorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
queues: []string{"default", "custom"}, queues: []string{"default", "custom"},
interval: interval, interval: interval,
batchSize: batchSize,
}) })
now := time.Now() now := time.Now()

View File

@ -239,6 +239,17 @@ type Config struct {
// //
// If unset or nil, the group aggregation feature will be disabled on the server. // If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregator GroupAggregator GroupAggregator GroupAggregator
// JanitorInterval specifies the average interval of janitor checks for expired completed tasks.
//
// If unset or zero, default interval of 8 seconds is used.
JanitorInterval time.Duration
// JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run.
//
// If unset or zero, default batch size of 100 is used.
// Make sure to not put a big number as the batch size to prevent a long-running script.
JanitorBatchSize int
} }
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
@ -408,6 +419,10 @@ const (
defaultDelayedTaskCheckInterval = 5 * time.Second defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute defaultGroupGracePeriod = 1 * time.Minute
defaultJanitorInterval = 8 * time.Second
defaultJanitorBatchSize = 100
) )
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
@ -547,11 +562,26 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
interval: healthcheckInterval, interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc, healthcheckFunc: cfg.HealthCheckFunc,
}) })
janitorInterval := cfg.JanitorInterval
if janitorInterval == 0 {
janitorInterval = defaultJanitorInterval
}
janitorBatchSize := cfg.JanitorBatchSize
if janitorBatchSize == 0 {
janitorBatchSize = defaultJanitorBatchSize
}
if janitorBatchSize > defaultJanitorBatchSize {
logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+
"This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize)
}
janitor := newJanitor(janitorParams{ janitor := newJanitor(janitorParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
queues: qnames, queues: qnames,
interval: 8 * time.Second, interval: janitorInterval,
batchSize: janitorBatchSize,
}) })
aggregator := newAggregator(aggregatorParams{ aggregator := newAggregator(aggregatorParams{
logger: logger, logger: logger,