2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-20 21:26:14 +08:00

Compare commits

...

10 Commits

Author SHA1 Message Date
Mohammed Sohail
461d922616 docs: apply recommendaded updates
* additionally, we log an erro in the case the redis client cannot shutdown in the scheduler
2024-10-19 09:05:17 +03:00
Mohammed Sohail
5daa3c52ed Merge remote-tracking branch 'jerbob92-fork/feature/implement-reusing-redis-client' into develop 2024-10-19 08:58:39 +03:00
Tedja
d04888e748 feature: configurable janitor interval and deletion batch size (#715)
* feature: configurable janitor interval and deletion batch size

* warn user when they set a big number of janitor batch size

* Update CHANGELOG.md

---------

Co-authored-by: Agung Hariadi Tedja <agung.tedja@kumparan.com>
2024-05-06 14:11:52 +08:00
Trịnh Đức Bảo Linh(Kevin)
174008843d feat(*): correct panic error (#758)
* error panic handling

* updated CHANGELOG.md file

* correct msg panic error (#5)

* correct msg panic error
2024-05-06 13:46:19 +08:00
Mohamed Sohail 天命
2b632b93d5 chore: fix function names in comment (pull request #860 from camcui/master)
chore: fix function names in comment
2024-04-23 00:56:52 +08:00
camcui
b35b559d40 chore: fix function names in comment
Signed-off-by: camcui <cuishua@sina.cn>
2024-04-12 13:54:08 +08:00
Mohamed Sohail
8df0bfa583 Merge pull request #843 from mrusme/fix-bsd
Fix go:build for BSD
2024-03-15 13:32:19 +08:00
mrusme
b25d10b61d Fixed go:build for BSD 2024-03-14 20:26:33 +05:00
crazyoptimist
38f7499b71 fix(typo): delete-all to deleteall (#827)
* typo: delete-all to deleteall

* docs: update tools/asynq/README.md

* fix archiveall runall

---------

Co-authored-by: Mohamed Sohail <sohailsameja@gmail.com>
2024-02-23 09:17:12 +03:00
Jeroen Bobbeldijk
9e548fc097 Implement reusing redis client 2023-09-19 11:20:32 +02:00
19 changed files with 257 additions and 83 deletions

View File

@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.25.0] - 2023-01-02
### Added
- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715)
## [0.24.1] - 2023-05-01 ## [0.24.1] - 2023-05-01
### Changed ### Changed

View File

@@ -10,11 +10,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
) )
// A Client is responsible for scheduling tasks. // A Client is responsible for scheduling tasks.
@@ -25,15 +25,26 @@ import (
// Clients are safe for concurrent use by multiple goroutines. // Clients are safe for concurrent use by multiple goroutines.
type Client struct { type Client struct {
broker base.Broker broker base.Broker
// When a Client has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
} }
// NewClient returns a new Client instance given a redis connection option. // NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client { func NewClient(r RedisConnOpt) *Client {
c, ok := r.MakeRedisClient().(redis.UniversalClient) redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
} }
return &Client{broker: rdb.NewRDB(c)} client := NewClientFromRedisClient(redisClient)
client.sharedConnection = false
return client
}
// NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
} }
type OptionType int type OptionType int
@@ -150,9 +161,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second. // TTL duration must be greater than or equal to 1 second.
// //
// Uniqueness of a task is based on the following properties: // Uniqueness of a task is based on the following properties:
// - Task Type // - Task Type
// - Task Payload // - Task Payload
// - Queue Name // - Queue Name
func Unique(ttl time.Duration) Option { func Unique(ttl time.Duration) Option {
return uniqueOption(ttl) return uniqueOption(ttl)
} }
@@ -307,6 +318,9 @@ var (
// Close closes the connection with redis. // Close closes the connection with redis.
func (c *Client) Close() error { func (c *Client) Close() error {
if c.sharedConnection {
return fmt.Errorf("redis connection is shared so the Client can't be closed through asynq")
}
return c.broker.Close() return c.broker.Close()
} }

View File

@@ -14,6 +14,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
h "github.com/hibiken/asynq/internal/testutil" h "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
) )
func TestClientEnqueueWithProcessAtOption(t *testing.T) { func TestClientEnqueueWithProcessAtOption(t *testing.T) {
@@ -143,11 +144,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
} }
} }
func TestClientEnqueue(t *testing.T) { func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
@@ -478,6 +475,24 @@ func TestClientEnqueue(t *testing.T) {
} }
} }
func TestClientEnqueue(t *testing.T) {
r := setup(t)
client := NewClient(getRedisConnOpt(t))
defer client.Close()
testClientEnqueue(t, client, r)
}
func TestClientFromRedisClientEnqueue(t *testing.T) {
r := setup(t)
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
client := NewClientFromRedisClient(redisClient)
testClientEnqueue(t, client, r)
err := client.Close()
if err == nil {
t.Error("client.Close() should have failed because of a shared client but it didn't")
}
}
func TestClientEnqueueWithGroupOption(t *testing.T) { func TestClientEnqueueWithGroupOption(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))

View File

@@ -10,16 +10,19 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
) )
// Inspector is a client interface to inspect and mutate the state of // Inspector is a client interface to inspect and mutate the state of
// queues and tasks. // queues and tasks.
type Inspector struct { type Inspector struct {
rdb *rdb.RDB rdb *rdb.RDB
// When an Inspector has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
} }
// New returns a new instance of Inspector. // New returns a new instance of Inspector.
@@ -28,13 +31,25 @@ func NewInspector(r RedisConnOpt) *Inspector {
if !ok { if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
} }
inspector := NewInspectorFromRedisClient(c)
inspector.sharedConnection = false
return inspector
}
// NewInspectorFromRedisClient returns a new instance of Inspector given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewInspectorFromRedisClient(c redis.UniversalClient) *Inspector {
return &Inspector{ return &Inspector{
rdb: rdb.NewRDB(c), rdb: rdb.NewRDB(c),
sharedConnection: true,
} }
} }
// Close closes the connection with redis. // Close closes the connection with redis.
func (i *Inspector) Close() error { func (i *Inspector) Close() error {
if i.sharedConnection {
return fmt.Errorf("redis connection is shared so the Inspector can't be closed through asynq")
}
return i.rdb.Close() return i.rdb.Close()
} }

View File

@@ -22,11 +22,7 @@ import (
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
) )
func TestInspectorQueues(t *testing.T) { func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalClient) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct { tests := []struct {
queues []string queues []string
}{ }{
@@ -52,7 +48,21 @@ func TestInspectorQueues(t *testing.T) {
t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff) t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
} }
} }
}
func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := NewInspector(getRedisConnOpt(t))
testInspectorQueues(t, inspector, r)
}
func TestInspectorFromRedisClientQueues(t *testing.T) {
r := setup(t)
defer r.Close()
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
inspector := NewInspectorFromRedisClient(redisClient)
testInspectorQueues(t, inspector, r)
} }
func TestInspectorDeleteQueue(t *testing.T) { func TestInspectorDeleteQueue(t *testing.T) {

View File

@@ -737,7 +737,7 @@ type Broker interface {
ReclaimStaleAggregationSets(qname string) error ReclaimStaleAggregationSets(qname string) error
// Task retention related method // Task retention related method
DeleteExpiredCompletedTasks(qname string) error DeleteExpiredCompletedTasks(qname string, batchSize int) error
// Lease related methods // Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)

View File

@@ -1217,7 +1217,7 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // ReclaimStaleAggregationSets checks for any stale aggregation sets in the given queue, and
// reclaim tasks in the stale aggregation set by putting them back in the group. // reclaim tasks in the stale aggregation set by putting them back in the group.
func (r *RDB) ReclaimStaleAggregationSets(qname string) error { func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
var op errors.Op = "RDB.ReclaimStaleAggregationSets" var op errors.Op = "RDB.ReclaimStaleAggregationSets"
@@ -1241,9 +1241,7 @@ return table.getn(ids)`)
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, // DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
// and delete all expired tasks. // and delete all expired tasks.
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
// Note: Do this operation in fix batches to prevent long running script.
const batchSize = 100
for { for {
n, err := r.deleteExpiredCompletedTasks(qname, batchSize) n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
if err != nil { if err != nil {

View File

@@ -2542,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
h.SeedAllCompletedQueues(t, r.client, tc.completed) h.SeedAllCompletedQueues(t, r.client, tc.completed)
if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil { if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err) t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err)
continue continue
} }

View File

@@ -11,8 +11,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
) )
var errRedisDown = errors.New("testutil: redis is down") var errRedisDown = errors.New("testutil: redis is down")
@@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
return tb.real.ForwardIfReady(qnames...) return tb.real.ForwardIfReady(qnames...)
} }
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error { func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return errRedisDown return errRedisDown
} }
return tb.real.DeleteExpiredCompletedTasks(qname) return tb.real.DeleteExpiredCompletedTasks(qname, batchSize)
} }
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {

View File

@@ -27,13 +27,17 @@ type janitor struct {
// average interval between checks. // average interval between checks.
avgInterval time.Duration avgInterval time.Duration
// number of tasks to be deleted when janitor runs to delete the expired completed tasks.
batchSize int
} }
type janitorParams struct { type janitorParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
queues []string queues []string
interval time.Duration interval time.Duration
batchSize int
} }
func newJanitor(params janitorParams) *janitor { func newJanitor(params janitorParams) *janitor {
@@ -43,6 +47,7 @@ func newJanitor(params janitorParams) *janitor {
done: make(chan struct{}), done: make(chan struct{}),
queues: params.queues, queues: params.queues,
avgInterval: params.interval, avgInterval: params.interval,
batchSize: params.batchSize,
} }
} }
@@ -73,7 +78,7 @@ func (j *janitor) start(wg *sync.WaitGroup) {
func (j *janitor) exec() { func (j *janitor) exec() {
for _, qname := range j.queues { for _, qname := range j.queues {
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil { if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil {
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v", j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
qname, err) qname, err)
} }

View File

@@ -26,11 +26,13 @@ func TestJanitor(t *testing.T) {
defer r.Close() defer r.Close()
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
const interval = 1 * time.Second const interval = 1 * time.Second
const batchSize = 100
janitor := newJanitor(janitorParams{ janitor := newJanitor(janitorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
queues: []string{"default", "custom"}, queues: []string{"default", "custom"},
interval: interval, interval: interval,
batchSize: batchSize,
}) })
now := time.Now() now := time.Now()

View File

@@ -415,21 +415,19 @@ func (p *processor) queues() []string {
func (p *processor) perform(ctx context.Context, task *Task) (err error) { func (p *processor) perform(ctx context.Context, task *Task) (err error) {
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
errMsg := string(debug.Stack()) p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", errMsg)
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself) _, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
if ok && strings.Contains(file, "runtime/") { if ok && strings.Contains(file, "runtime/") {
// The panic came from the runtime, most likely due to incorrect // The panic came from the runtime, most likely due to incorrect
// map/slice usage. The parent frame should have the real trigger. // map/slice usage. The parent frame should have the real trigger.
_, file, line, ok = runtime.Caller(2) _, file, line, ok = runtime.Caller(2)
} }
var errMsg string
// Include the file and line number info in the error, if runtime.Caller returned ok. // Include the file and line number info in the error, if runtime.Caller returned ok.
if ok { if ok {
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x) errMsg = fmt.Sprintf("panic [%s:%d]: %v", file, line, x)
} else { } else {
err = fmt.Errorf("panic: %v", x) errMsg = fmt.Sprintf("panic: %v", x)
} }
err = &errors.PanicError{ err = &errors.PanicError{
ErrMsg: errMsg, ErrMsg: errMsg,

View File

@@ -10,11 +10,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
) )
@@ -43,15 +43,27 @@ type Scheduler struct {
// to avoid using cron.EntryID as the public API of // to avoid using cron.EntryID as the public API of
// the Scheduler. // the Scheduler.
idmap map[string]cron.EntryID idmap map[string]cron.EntryID
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
} }
// NewScheduler returns a new Scheduler instance given the redis connection option. // NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil // The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient) redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
} }
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil { if opts == nil {
opts = &SchedulerOpts{} opts = &SchedulerOpts{}
} }
@@ -72,7 +84,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
id: generateSchedulerID(), id: generateSchedulerID(),
state: &serverState{value: srvStateNew}, state: &serverState{value: srvStateNew},
logger: logger, logger: logger,
client: NewClient(r), client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c), rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)), cron: cron.New(cron.WithLocation(loc)),
location: loc, location: loc,
@@ -261,8 +273,12 @@ func (s *Scheduler) Shutdown() {
s.wg.Wait() s.wg.Wait()
s.clearHistory() s.clearHistory()
s.client.Close() if err := s.client.Close(); err != nil {
s.rdb.Close() s.logger.Errorf("Failed to close redis client connection: %v", err)
}
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped") s.logger.Info("Scheduler stopped")
} }

View File

@@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"github.com/redis/go-redis/v9"
"sync" "sync"
"testing" "testing"
"time" "time"
@@ -58,6 +59,7 @@ func TestSchedulerRegister(t *testing.T) {
r := setup(t) r := setup(t)
// Tests for new redis connection.
for _, tc := range tests { for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil) scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
@@ -75,6 +77,28 @@ func TestSchedulerRegister(t *testing.T) {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff) t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
} }
} }
r = setup(t)
// Tests for existing redis connection.
for _, tc := range tests {
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
scheduler := NewSchedulerFromRedisClient(redisClient, nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
scheduler.Shutdown()
got := testutil.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
} }
func TestSchedulerWhenRedisDown(t *testing.T) { func TestSchedulerWhenRedisDown(t *testing.T) {

View File

@@ -37,6 +37,9 @@ type Server struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
// When a Server has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
state *serverState state *serverState
@@ -103,7 +106,7 @@ type Config struct {
// If BaseContext is nil, the default is context.Background(). // If BaseContext is nil, the default is context.Background().
// If this is defined, then it MUST return a non-nil context // If this is defined, then it MUST return a non-nil context
BaseContext func() context.Context BaseContext func() context.Context
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty. // TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
// //
// If unset, zero or a negative value, the interval is set to 1 second. // If unset, zero or a negative value, the interval is set to 1 second.
@@ -239,6 +242,17 @@ type Config struct {
// //
// If unset or nil, the group aggregation feature will be disabled on the server. // If unset or nil, the group aggregation feature will be disabled on the server.
GroupAggregator GroupAggregator GroupAggregator GroupAggregator
// JanitorInterval specifies the average interval of janitor checks for expired completed tasks.
//
// If unset or zero, default interval of 8 seconds is used.
JanitorInterval time.Duration
// JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run.
//
// If unset or zero, default batch size of 100 is used.
// Make sure to not put a big number as the batch size to prevent a long-running script.
JanitorBatchSize int
} }
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
@@ -408,15 +422,28 @@ const (
defaultDelayedTaskCheckInterval = 5 * time.Second defaultDelayedTaskCheckInterval = 5 * time.Second
defaultGroupGracePeriod = 1 * time.Minute defaultGroupGracePeriod = 1 * time.Minute
defaultJanitorInterval = 8 * time.Second
defaultJanitorBatchSize = 100
) )
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
// and server configuration. // and server configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server { func NewServer(r RedisConnOpt, cfg Config) *Server {
c, ok := r.MakeRedisClient().(redis.UniversalClient) redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
} }
server := NewServerFromRedisClient(redisClient, cfg)
server.sharedConnection = false
return server
}
// NewServerFromRedisClient returns a new instance of Server given a redis.UniversalClient
// and server configuration
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
baseCtxFn := cfg.BaseContext baseCtxFn := cfg.BaseContext
if baseCtxFn == nil { if baseCtxFn == nil {
baseCtxFn = context.Background baseCtxFn = context.Background
@@ -547,11 +574,26 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
interval: healthcheckInterval, interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc, healthcheckFunc: cfg.HealthCheckFunc,
}) })
janitorInterval := cfg.JanitorInterval
if janitorInterval == 0 {
janitorInterval = defaultJanitorInterval
}
janitorBatchSize := cfg.JanitorBatchSize
if janitorBatchSize == 0 {
janitorBatchSize = defaultJanitorBatchSize
}
if janitorBatchSize > defaultJanitorBatchSize {
logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+
"This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize)
}
janitor := newJanitor(janitorParams{ janitor := newJanitor(janitorParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
queues: qnames, queues: qnames,
interval: 8 * time.Second, interval: janitorInterval,
batchSize: janitorBatchSize,
}) })
aggregator := newAggregator(aggregatorParams{ aggregator := newAggregator(aggregatorParams{
logger: logger, logger: logger,
@@ -563,18 +605,19 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
groupAggregator: cfg.GroupAggregator, groupAggregator: cfg.GroupAggregator,
}) })
return &Server{ return &Server{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
state: srvState, sharedConnection: true,
forwarder: forwarder, state: srvState,
processor: processor, forwarder: forwarder,
syncer: syncer, processor: processor,
heartbeater: heartbeater, syncer: syncer,
subscriber: subscriber, heartbeater: heartbeater,
recoverer: recoverer, subscriber: subscriber,
healthchecker: healthchecker, recoverer: recoverer,
janitor: janitor, healthchecker: healthchecker,
aggregator: aggregator, janitor: janitor,
aggregator: aggregator,
} }
} }
@@ -702,7 +745,9 @@ func (srv *Server) Shutdown() {
srv.heartbeater.shutdown() srv.heartbeater.shutdown()
srv.wg.Wait() srv.wg.Wait()
srv.broker.Close() if !srv.sharedConnection {
srv.broker.Close()
}
srv.logger.Info("Exiting") srv.logger.Info("Exiting")
} }

View File

@@ -14,22 +14,12 @@ import (
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker" "github.com/hibiken/asynq/internal/testbroker"
"github.com/hibiken/asynq/internal/testutil" "github.com/hibiken/asynq/internal/testutil"
"github.com/redis/go-redis/v9"
"go.uber.org/goleak" "go.uber.org/goleak"
) )
func TestServer(t *testing.T) { func testServer(t *testing.T, c *Client, srv *Server) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt)
defer c.Close()
srv := NewServer(redisConnOpt, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
// no-op handler // no-op handler
h := func(ctx context.Context, task *Task) error { h := func(ctx context.Context, task *Task) error {
return nil return nil
@@ -53,6 +43,43 @@ func TestServer(t *testing.T) {
srv.Shutdown() srv.Shutdown()
} }
func TestServer(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt)
defer c.Close()
srv := NewServer(redisConnOpt, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
testServer(t, c, srv)
}
func TestServerFromRedisClient(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
redisClient := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
c := NewClientFromRedisClient(redisClient)
srv := NewServerFromRedisClient(redisClient, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
testServer(t, c, srv)
err := c.Close()
if err == nil {
t.Error("client.Close() should have failed because of a shared client but it didn't")
}
}
func TestServerRun(t *testing.T) { func TestServerRun(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029 // https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper") ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")

View File

@@ -1,4 +1,4 @@
//go:build linux || bsd || darwin //go:build linux || dragonfly || freebsd || netbsd || openbsd || darwin
package asynq package asynq

View File

@@ -25,7 +25,7 @@ To view details on any command, use `asynq help <command> <subcommand>`.
- `asynq dash` - `asynq dash`
- `asynq stats` - `asynq stats`
- `asynq queue [ls inspect history rm pause unpause]` - `asynq queue [ls inspect history rm pause unpause]`
- `asynq task [ls cancel delete archive run delete-all archive-all run-all]` - `asynq task [ls cancel delete archive run deleteall archiveall runall]`
- `asynq server [ls]` - `asynq server [ls]`
### Global flags ### Global flags

View File

@@ -369,7 +369,7 @@ func createRDB() *rdb.RDB {
return rdb.NewRDB(c) return rdb.NewRDB(c)
} }
// createRDB creates a Inspector instance using flag values and returns it. // createInspector creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector { func createInspector() *asynq.Inspector {
return asynq.NewInspector(getRedisConnOpt()) return asynq.NewInspector(getRedisConnOpt())
} }