mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Add janitor goroutine
This commit is contained in:
parent
79c17b9ee9
commit
902a34a308
@ -670,6 +670,7 @@ type Broker interface {
|
||||
Retry(msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
|
||||
Archive(msg *TaskMessage, errMsg string) error
|
||||
ForwardIfReady(qnames ...string) error
|
||||
DeleteExpiredCompletedTasks(qname string) error
|
||||
ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
||||
ClearServerState(host string, pid int, serverID string) error
|
||||
|
@ -144,6 +144,15 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
|
||||
return tb.real.ForwardIfReady(qnames...)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
if tb.sleeping {
|
||||
return errRedisDown
|
||||
}
|
||||
return tb.real.DeleteExpiredCompletedTasks(qname)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
|
81
janitor.go
Normal file
81
janitor.go
Normal file
@ -0,0 +1,81 @@
|
||||
// Copyright 2021 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/log"
|
||||
)
|
||||
|
||||
// A janitor is responsible for deleting expired completed tasks from the specified
|
||||
// queues. It periodically checks for any expired tasks in the completed set, and
|
||||
// deletes them.
|
||||
type janitor struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
|
||||
// channel to communicate back to the long running "janitor" goroutine.
|
||||
done chan struct{}
|
||||
|
||||
// list of queue names to check.
|
||||
queues []string
|
||||
|
||||
// average interval between checks.
|
||||
avgInterval time.Duration
|
||||
}
|
||||
|
||||
type janitorParams struct {
|
||||
logger *log.Logger
|
||||
broker base.Broker
|
||||
queues []string
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func newJanitor(params janitorParams) *janitor {
|
||||
return &janitor{
|
||||
logger: params.logger,
|
||||
broker: params.broker,
|
||||
done: make(chan struct{}),
|
||||
queues: params.queues,
|
||||
avgInterval: params.interval,
|
||||
}
|
||||
}
|
||||
|
||||
func (j *janitor) shutdown() {
|
||||
j.logger.Debug("Janitor shutting down...")
|
||||
// Signal the janitor goroutine to stop.
|
||||
j.done <- struct{}{}
|
||||
}
|
||||
|
||||
// start starts the "janitor" goroutine.
|
||||
func (j *janitor) start(wg *sync.WaitGroup) {
|
||||
wg.Add(1)
|
||||
timer := time.NewTimer(j.avgInterval) // randomize this interval with margin of 1s
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-j.done:
|
||||
j.logger.Debug("Janitor done")
|
||||
return
|
||||
case <-timer.C:
|
||||
j.exec()
|
||||
timer.Reset(j.avgInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (j *janitor) exec() {
|
||||
for _, qname := range j.queues {
|
||||
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil {
|
||||
j.logger.Errorf("Could not delete expired completed tasks from queue %q: %v",
|
||||
qname, err)
|
||||
}
|
||||
}
|
||||
}
|
89
janitor_test.go
Normal file
89
janitor_test.go
Normal file
@ -0,0 +1,89 @@
|
||||
// Copyright 2021 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
func newCompletedTask(qname, tasktype string, payload []byte, completedAt time.Time) *base.TaskMessage {
|
||||
msg := h.NewTaskMessageWithQueue(tasktype, payload, qname)
|
||||
msg.CompletedAt = completedAt.Unix()
|
||||
return msg
|
||||
}
|
||||
|
||||
func TestJanitor(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
const interval = 1 * time.Second
|
||||
janitor := newJanitor(janitorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
queues: []string{"default", "custom"},
|
||||
interval: interval,
|
||||
})
|
||||
|
||||
now := time.Now()
|
||||
hourAgo := now.Add(-1 * time.Hour)
|
||||
minuteAgo := now.Add(-1 * time.Minute)
|
||||
halfHourAgo := now.Add(-30 * time.Minute)
|
||||
halfHourFromNow := now.Add(30 * time.Minute)
|
||||
fiveMinFromNow := now.Add(5 * time.Minute)
|
||||
msg1 := newCompletedTask("default", "task1", nil, hourAgo)
|
||||
msg2 := newCompletedTask("default", "task2", nil, minuteAgo)
|
||||
msg3 := newCompletedTask("custom", "task3", nil, hourAgo)
|
||||
msg4 := newCompletedTask("custom", "task4", nil, minuteAgo)
|
||||
|
||||
tests := []struct {
|
||||
completed map[string][]base.Z // initial completed sets
|
||||
wantCompleted map[string][]base.Z // expected completed sets after janitor runs
|
||||
}{
|
||||
{
|
||||
completed: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: msg1, Score: halfHourAgo.Unix()},
|
||||
{Message: msg2, Score: fiveMinFromNow.Unix()},
|
||||
},
|
||||
"custom": {
|
||||
{Message: msg3, Score: halfHourFromNow.Unix()},
|
||||
{Message: msg4, Score: minuteAgo.Unix()},
|
||||
},
|
||||
},
|
||||
wantCompleted: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: msg2, Score: fiveMinFromNow.Unix()},
|
||||
},
|
||||
"custom": {
|
||||
{Message: msg3, Score: halfHourFromNow.Unix()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllCompletedQueues(t, r, tc.completed)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
janitor.start(&wg)
|
||||
time.Sleep(2 * interval) // make sure to let janitor run at least one time
|
||||
janitor.shutdown()
|
||||
|
||||
for qname, want := range tc.wantCompleted {
|
||||
got := h.GetCompletedEntries(t, r, qname)
|
||||
if diff := cmp.Diff(want, got, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("diff found in %q after running janitor: (-want, +got)\n%s", base.CompletedKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
10
server.go
10
server.go
@ -49,6 +49,7 @@ type Server struct {
|
||||
subscriber *subscriber
|
||||
recoverer *recoverer
|
||||
healthchecker *healthchecker
|
||||
janitor *janitor
|
||||
}
|
||||
|
||||
// Config specifies the server's background-task processing behavior.
|
||||
@ -401,6 +402,12 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
interval: healthcheckInterval,
|
||||
healthcheckFunc: cfg.HealthCheckFunc,
|
||||
})
|
||||
janitor := newJanitor(janitorParams{
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
queues: qnames,
|
||||
interval: 8 * time.Second,
|
||||
})
|
||||
return &Server{
|
||||
logger: logger,
|
||||
broker: rdb,
|
||||
@ -412,6 +419,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
subscriber: subscriber,
|
||||
recoverer: recoverer,
|
||||
healthchecker: healthchecker,
|
||||
janitor: janitor,
|
||||
}
|
||||
}
|
||||
|
||||
@ -493,6 +501,7 @@ func (srv *Server) Start(handler Handler) error {
|
||||
srv.recoverer.start(&srv.wg)
|
||||
srv.forwarder.start(&srv.wg)
|
||||
srv.processor.start(&srv.wg)
|
||||
srv.janitor.start(&srv.wg)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -517,6 +526,7 @@ func (srv *Server) Shutdown() {
|
||||
srv.recoverer.shutdown()
|
||||
srv.syncer.shutdown()
|
||||
srv.subscriber.shutdown()
|
||||
srv.janitor.shutdown()
|
||||
srv.healthchecker.shutdown()
|
||||
srv.heartbeater.shutdown()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user