mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-20 21:26:14 +08:00
Compare commits
10 Commits
sohail/cha
...
develop
Author | SHA1 | Date | |
---|---|---|---|
|
461d922616 | ||
|
5daa3c52ed | ||
|
d04888e748 | ||
|
174008843d | ||
|
2b632b93d5 | ||
|
b35b559d40 | ||
|
8df0bfa583 | ||
|
b25d10b61d | ||
|
38f7499b71 | ||
|
9e548fc097 |
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.25.0] - 2023-01-02
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Added configuration for Janitor's Interval and Deletion Batch Size (PR: https://github.com/hibiken/asynq/pull/715)
|
||||||
|
|
||||||
## [0.24.1] - 2023-05-01
|
## [0.24.1] - 2023-05-01
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
26
client.go
26
client.go
@@ -10,11 +10,11 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Client is responsible for scheduling tasks.
|
// A Client is responsible for scheduling tasks.
|
||||||
@@ -25,15 +25,26 @@ import (
|
|||||||
// Clients are safe for concurrent use by multiple goroutines.
|
// Clients are safe for concurrent use by multiple goroutines.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
broker base.Broker
|
broker base.Broker
|
||||||
|
// When a Client has been created with an existing Redis connection, we do
|
||||||
|
// not want to close it.
|
||||||
|
sharedConnection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient returns a new Client instance given a redis connection option.
|
// NewClient returns a new Client instance given a redis connection option.
|
||||||
func NewClient(r RedisConnOpt) *Client {
|
func NewClient(r RedisConnOpt) *Client {
|
||||||
c, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
return &Client{broker: rdb.NewRDB(c)}
|
client := NewClientFromRedisClient(redisClient)
|
||||||
|
client.sharedConnection = false
|
||||||
|
return client
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient
|
||||||
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
|
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
|
||||||
|
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
|
||||||
}
|
}
|
||||||
|
|
||||||
type OptionType int
|
type OptionType int
|
||||||
@@ -150,9 +161,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
|
|||||||
// TTL duration must be greater than or equal to 1 second.
|
// TTL duration must be greater than or equal to 1 second.
|
||||||
//
|
//
|
||||||
// Uniqueness of a task is based on the following properties:
|
// Uniqueness of a task is based on the following properties:
|
||||||
// - Task Type
|
// - Task Type
|
||||||
// - Task Payload
|
// - Task Payload
|
||||||
// - Queue Name
|
// - Queue Name
|
||||||
func Unique(ttl time.Duration) Option {
|
func Unique(ttl time.Duration) Option {
|
||||||
return uniqueOption(ttl)
|
return uniqueOption(ttl)
|
||||||
}
|
}
|
||||||
@@ -307,6 +318,9 @@ var (
|
|||||||
|
|
||||||
// Close closes the connection with redis.
|
// Close closes the connection with redis.
|
||||||
func (c *Client) Close() error {
|
func (c *Client) Close() error {
|
||||||
|
if c.sharedConnection {
|
||||||
|
return fmt.Errorf("redis connection is shared so the Client can't be closed through asynq")
|
||||||
|
}
|
||||||
return c.broker.Close()
|
return c.broker.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
h "github.com/hibiken/asynq/internal/testutil"
|
h "github.com/hibiken/asynq/internal/testutil"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||||
@@ -143,11 +144,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientEnqueue(t *testing.T) {
|
func testClientEnqueue(t *testing.T, client *Client, r redis.UniversalClient) {
|
||||||
r := setup(t)
|
|
||||||
client := NewClient(getRedisConnOpt(t))
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
|
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
@@ -478,6 +475,24 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientEnqueue(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
defer client.Close()
|
||||||
|
testClientEnqueue(t, client, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientFromRedisClientEnqueue(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
|
||||||
|
client := NewClientFromRedisClient(redisClient)
|
||||||
|
testClientEnqueue(t, client, r)
|
||||||
|
err := client.Close()
|
||||||
|
if err == nil {
|
||||||
|
t.Error("client.Close() should have failed because of a shared client but it didn't")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClientEnqueueWithGroupOption(t *testing.T) {
|
func TestClientEnqueueWithGroupOption(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
client := NewClient(getRedisConnOpt(t))
|
client := NewClient(getRedisConnOpt(t))
|
||||||
|
19
inspector.go
19
inspector.go
@@ -10,16 +10,19 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Inspector is a client interface to inspect and mutate the state of
|
// Inspector is a client interface to inspect and mutate the state of
|
||||||
// queues and tasks.
|
// queues and tasks.
|
||||||
type Inspector struct {
|
type Inspector struct {
|
||||||
rdb *rdb.RDB
|
rdb *rdb.RDB
|
||||||
|
// When an Inspector has been created with an existing Redis connection, we do
|
||||||
|
// not want to close it.
|
||||||
|
sharedConnection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// New returns a new instance of Inspector.
|
// New returns a new instance of Inspector.
|
||||||
@@ -28,13 +31,25 @@ func NewInspector(r RedisConnOpt) *Inspector {
|
|||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
|
inspector := NewInspectorFromRedisClient(c)
|
||||||
|
inspector.sharedConnection = false
|
||||||
|
return inspector
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInspectorFromRedisClient returns a new instance of Inspector given a redis.UniversalClient
|
||||||
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
|
func NewInspectorFromRedisClient(c redis.UniversalClient) *Inspector {
|
||||||
return &Inspector{
|
return &Inspector{
|
||||||
rdb: rdb.NewRDB(c),
|
rdb: rdb.NewRDB(c),
|
||||||
|
sharedConnection: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the connection with redis.
|
// Close closes the connection with redis.
|
||||||
func (i *Inspector) Close() error {
|
func (i *Inspector) Close() error {
|
||||||
|
if i.sharedConnection {
|
||||||
|
return fmt.Errorf("redis connection is shared so the Inspector can't be closed through asynq")
|
||||||
|
}
|
||||||
return i.rdb.Close()
|
return i.rdb.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -22,11 +22,7 @@ import (
|
|||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestInspectorQueues(t *testing.T) {
|
func testInspectorQueues(t *testing.T, inspector *Inspector, r redis.UniversalClient) {
|
||||||
r := setup(t)
|
|
||||||
defer r.Close()
|
|
||||||
inspector := NewInspector(getRedisConnOpt(t))
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
queues []string
|
queues []string
|
||||||
}{
|
}{
|
||||||
@@ -52,7 +48,21 @@ func TestInspectorQueues(t *testing.T) {
|
|||||||
t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
|
t.Errorf("Queues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInspectorQueues(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
inspector := NewInspector(getRedisConnOpt(t))
|
||||||
|
testInspectorQueues(t, inspector, r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInspectorFromRedisClientQueues(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
|
||||||
|
inspector := NewInspectorFromRedisClient(redisClient)
|
||||||
|
testInspectorQueues(t, inspector, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInspectorDeleteQueue(t *testing.T) {
|
func TestInspectorDeleteQueue(t *testing.T) {
|
||||||
|
@@ -737,7 +737,7 @@ type Broker interface {
|
|||||||
ReclaimStaleAggregationSets(qname string) error
|
ReclaimStaleAggregationSets(qname string) error
|
||||||
|
|
||||||
// Task retention related method
|
// Task retention related method
|
||||||
DeleteExpiredCompletedTasks(qname string) error
|
DeleteExpiredCompletedTasks(qname string, batchSize int) error
|
||||||
|
|
||||||
// Lease related methods
|
// Lease related methods
|
||||||
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
|
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||||
|
@@ -1217,7 +1217,7 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
|||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
|
|
||||||
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and
|
// ReclaimStaleAggregationSets checks for any stale aggregation sets in the given queue, and
|
||||||
// reclaim tasks in the stale aggregation set by putting them back in the group.
|
// reclaim tasks in the stale aggregation set by putting them back in the group.
|
||||||
func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
|
func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
|
||||||
var op errors.Op = "RDB.ReclaimStaleAggregationSets"
|
var op errors.Op = "RDB.ReclaimStaleAggregationSets"
|
||||||
@@ -1241,9 +1241,7 @@ return table.getn(ids)`)
|
|||||||
|
|
||||||
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
|
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
|
||||||
// and delete all expired tasks.
|
// and delete all expired tasks.
|
||||||
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
|
func (r *RDB) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
|
||||||
// Note: Do this operation in fix batches to prevent long running script.
|
|
||||||
const batchSize = 100
|
|
||||||
for {
|
for {
|
||||||
n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
|
n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -2542,8 +2542,8 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
h.SeedAllCompletedQueues(t, r.client, tc.completed)
|
h.SeedAllCompletedQueues(t, r.client, tc.completed)
|
||||||
|
|
||||||
if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil {
|
if err := r.DeleteExpiredCompletedTasks(tc.qname, 100); err != nil {
|
||||||
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err)
|
t.Errorf("DeleteExpiredCompletedTasks(%q, 100) failed: %v", tc.qname, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -11,8 +11,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errRedisDown = errors.New("testutil: redis is down")
|
var errRedisDown = errors.New("testutil: redis is down")
|
||||||
@@ -145,13 +145,13 @@ func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
|
|||||||
return tb.real.ForwardIfReady(qnames...)
|
return tb.real.ForwardIfReady(qnames...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
|
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string, batchSize int) error {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
if tb.sleeping {
|
if tb.sleeping {
|
||||||
return errRedisDown
|
return errRedisDown
|
||||||
}
|
}
|
||||||
return tb.real.DeleteExpiredCompletedTasks(qname)
|
return tb.real.DeleteExpiredCompletedTasks(qname, batchSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
||||||
|
15
janitor.go
15
janitor.go
@@ -27,13 +27,17 @@ type janitor struct {
|
|||||||
|
|
||||||
// average interval between checks.
|
// average interval between checks.
|
||||||
avgInterval time.Duration
|
avgInterval time.Duration
|
||||||
|
|
||||||
|
// number of tasks to be deleted when janitor runs to delete the expired completed tasks.
|
||||||
|
batchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
type janitorParams struct {
|
type janitorParams struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
broker base.Broker
|
broker base.Broker
|
||||||
queues []string
|
queues []string
|
||||||
interval time.Duration
|
interval time.Duration
|
||||||
|
batchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newJanitor(params janitorParams) *janitor {
|
func newJanitor(params janitorParams) *janitor {
|
||||||
@@ -43,6 +47,7 @@ func newJanitor(params janitorParams) *janitor {
|
|||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
queues: params.queues,
|
queues: params.queues,
|
||||||
avgInterval: params.interval,
|
avgInterval: params.interval,
|
||||||
|
batchSize: params.batchSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -73,7 +78,7 @@ func (j *janitor) start(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
func (j *janitor) exec() {
|
func (j *janitor) exec() {
|
||||||
for _, qname := range j.queues {
|
for _, qname := range j.queues {
|
||||||
if err := j.broker.DeleteExpiredCompletedTasks(qname); err != nil {
|
if err := j.broker.DeleteExpiredCompletedTasks(qname, j.batchSize); err != nil {
|
||||||
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
|
j.logger.Errorf("Failed to delete expired completed tasks from queue %q: %v",
|
||||||
qname, err)
|
qname, err)
|
||||||
}
|
}
|
||||||
|
@@ -26,11 +26,13 @@ func TestJanitor(t *testing.T) {
|
|||||||
defer r.Close()
|
defer r.Close()
|
||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
const interval = 1 * time.Second
|
const interval = 1 * time.Second
|
||||||
|
const batchSize = 100
|
||||||
janitor := newJanitor(janitorParams{
|
janitor := newJanitor(janitorParams{
|
||||||
logger: testLogger,
|
logger: testLogger,
|
||||||
broker: rdbClient,
|
broker: rdbClient,
|
||||||
queues: []string{"default", "custom"},
|
queues: []string{"default", "custom"},
|
||||||
interval: interval,
|
interval: interval,
|
||||||
|
batchSize: batchSize,
|
||||||
})
|
})
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
10
processor.go
10
processor.go
@@ -415,21 +415,19 @@ func (p *processor) queues() []string {
|
|||||||
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if x := recover(); x != nil {
|
if x := recover(); x != nil {
|
||||||
errMsg := string(debug.Stack())
|
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
|
||||||
|
|
||||||
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", errMsg)
|
|
||||||
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
||||||
if ok && strings.Contains(file, "runtime/") {
|
if ok && strings.Contains(file, "runtime/") {
|
||||||
// The panic came from the runtime, most likely due to incorrect
|
// The panic came from the runtime, most likely due to incorrect
|
||||||
// map/slice usage. The parent frame should have the real trigger.
|
// map/slice usage. The parent frame should have the real trigger.
|
||||||
_, file, line, ok = runtime.Caller(2)
|
_, file, line, ok = runtime.Caller(2)
|
||||||
}
|
}
|
||||||
|
var errMsg string
|
||||||
// Include the file and line number info in the error, if runtime.Caller returned ok.
|
// Include the file and line number info in the error, if runtime.Caller returned ok.
|
||||||
if ok {
|
if ok {
|
||||||
err = fmt.Errorf("panic [%s:%d]: %v", file, line, x)
|
errMsg = fmt.Sprintf("panic [%s:%d]: %v", file, line, x)
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("panic: %v", x)
|
errMsg = fmt.Sprintf("panic: %v", x)
|
||||||
}
|
}
|
||||||
err = &errors.PanicError{
|
err = &errors.PanicError{
|
||||||
ErrMsg: errMsg,
|
ErrMsg: errMsg,
|
||||||
|
26
scheduler.go
26
scheduler.go
@@ -10,11 +10,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"github.com/robfig/cron/v3"
|
"github.com/robfig/cron/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -43,15 +43,27 @@ type Scheduler struct {
|
|||||||
// to avoid using cron.EntryID as the public API of
|
// to avoid using cron.EntryID as the public API of
|
||||||
// the Scheduler.
|
// the Scheduler.
|
||||||
idmap map[string]cron.EntryID
|
idmap map[string]cron.EntryID
|
||||||
|
// When a Scheduler has been created with an existing Redis connection, we do
|
||||||
|
// not want to close it.
|
||||||
|
sharedConnection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||||
c, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
|
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
||||||
|
scheduler.sharedConnection = false
|
||||||
|
return scheduler
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
|
||||||
|
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||||
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
|
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = &SchedulerOpts{}
|
opts = &SchedulerOpts{}
|
||||||
}
|
}
|
||||||
@@ -72,7 +84,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
|||||||
id: generateSchedulerID(),
|
id: generateSchedulerID(),
|
||||||
state: &serverState{value: srvStateNew},
|
state: &serverState{value: srvStateNew},
|
||||||
logger: logger,
|
logger: logger,
|
||||||
client: NewClient(r),
|
client: NewClientFromRedisClient(c),
|
||||||
rdb: rdb.NewRDB(c),
|
rdb: rdb.NewRDB(c),
|
||||||
cron: cron.New(cron.WithLocation(loc)),
|
cron: cron.New(cron.WithLocation(loc)),
|
||||||
location: loc,
|
location: loc,
|
||||||
@@ -261,8 +273,12 @@ func (s *Scheduler) Shutdown() {
|
|||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
|
||||||
s.clearHistory()
|
s.clearHistory()
|
||||||
s.client.Close()
|
if err := s.client.Close(); err != nil {
|
||||||
s.rdb.Close()
|
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||||
|
}
|
||||||
|
if !s.sharedConnection {
|
||||||
|
s.rdb.Close()
|
||||||
|
}
|
||||||
s.logger.Info("Scheduler stopped")
|
s.logger.Info("Scheduler stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -5,6 +5,7 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -58,6 +59,7 @@ func TestSchedulerRegister(t *testing.T) {
|
|||||||
|
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
|
// Tests for new redis connection.
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
scheduler := NewScheduler(getRedisConnOpt(t), nil)
|
scheduler := NewScheduler(getRedisConnOpt(t), nil)
|
||||||
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
|
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
|
||||||
@@ -75,6 +77,28 @@ func TestSchedulerRegister(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
|
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r = setup(t)
|
||||||
|
|
||||||
|
// Tests for existing redis connection.
|
||||||
|
for _, tc := range tests {
|
||||||
|
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
|
||||||
|
scheduler := NewSchedulerFromRedisClient(redisClient, nil)
|
||||||
|
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scheduler.Start(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(tc.wait)
|
||||||
|
scheduler.Shutdown()
|
||||||
|
|
||||||
|
got := testutil.GetPendingMessages(t, r, tc.queue)
|
||||||
|
if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSchedulerWhenRedisDown(t *testing.T) {
|
func TestSchedulerWhenRedisDown(t *testing.T) {
|
||||||
|
83
server.go
83
server.go
@@ -37,6 +37,9 @@ type Server struct {
|
|||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
|
||||||
broker base.Broker
|
broker base.Broker
|
||||||
|
// When a Server has been created with an existing Redis connection, we do
|
||||||
|
// not want to close it.
|
||||||
|
sharedConnection bool
|
||||||
|
|
||||||
state *serverState
|
state *serverState
|
||||||
|
|
||||||
@@ -103,7 +106,7 @@ type Config struct {
|
|||||||
// If BaseContext is nil, the default is context.Background().
|
// If BaseContext is nil, the default is context.Background().
|
||||||
// If this is defined, then it MUST return a non-nil context
|
// If this is defined, then it MUST return a non-nil context
|
||||||
BaseContext func() context.Context
|
BaseContext func() context.Context
|
||||||
|
|
||||||
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
|
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
|
||||||
//
|
//
|
||||||
// If unset, zero or a negative value, the interval is set to 1 second.
|
// If unset, zero or a negative value, the interval is set to 1 second.
|
||||||
@@ -239,6 +242,17 @@ type Config struct {
|
|||||||
//
|
//
|
||||||
// If unset or nil, the group aggregation feature will be disabled on the server.
|
// If unset or nil, the group aggregation feature will be disabled on the server.
|
||||||
GroupAggregator GroupAggregator
|
GroupAggregator GroupAggregator
|
||||||
|
|
||||||
|
// JanitorInterval specifies the average interval of janitor checks for expired completed tasks.
|
||||||
|
//
|
||||||
|
// If unset or zero, default interval of 8 seconds is used.
|
||||||
|
JanitorInterval time.Duration
|
||||||
|
|
||||||
|
// JanitorBatchSize specifies the number of expired completed tasks to be deleted in one run.
|
||||||
|
//
|
||||||
|
// If unset or zero, default batch size of 100 is used.
|
||||||
|
// Make sure to not put a big number as the batch size to prevent a long-running script.
|
||||||
|
JanitorBatchSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
|
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
|
||||||
@@ -408,15 +422,28 @@ const (
|
|||||||
defaultDelayedTaskCheckInterval = 5 * time.Second
|
defaultDelayedTaskCheckInterval = 5 * time.Second
|
||||||
|
|
||||||
defaultGroupGracePeriod = 1 * time.Minute
|
defaultGroupGracePeriod = 1 * time.Minute
|
||||||
|
|
||||||
|
defaultJanitorInterval = 8 * time.Second
|
||||||
|
|
||||||
|
defaultJanitorBatchSize = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewServer returns a new Server given a redis connection option
|
// NewServer returns a new Server given a redis connection option
|
||||||
// and server configuration.
|
// and server configuration.
|
||||||
func NewServer(r RedisConnOpt, cfg Config) *Server {
|
func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||||
c, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
|
server := NewServerFromRedisClient(redisClient, cfg)
|
||||||
|
server.sharedConnection = false
|
||||||
|
return server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServerFromRedisClient returns a new instance of Server given a redis.UniversalClient
|
||||||
|
// and server configuration
|
||||||
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
|
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
|
||||||
baseCtxFn := cfg.BaseContext
|
baseCtxFn := cfg.BaseContext
|
||||||
if baseCtxFn == nil {
|
if baseCtxFn == nil {
|
||||||
baseCtxFn = context.Background
|
baseCtxFn = context.Background
|
||||||
@@ -547,11 +574,26 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
interval: healthcheckInterval,
|
interval: healthcheckInterval,
|
||||||
healthcheckFunc: cfg.HealthCheckFunc,
|
healthcheckFunc: cfg.HealthCheckFunc,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
janitorInterval := cfg.JanitorInterval
|
||||||
|
if janitorInterval == 0 {
|
||||||
|
janitorInterval = defaultJanitorInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
janitorBatchSize := cfg.JanitorBatchSize
|
||||||
|
if janitorBatchSize == 0 {
|
||||||
|
janitorBatchSize = defaultJanitorBatchSize
|
||||||
|
}
|
||||||
|
if janitorBatchSize > defaultJanitorBatchSize {
|
||||||
|
logger.Warnf("Janitor batch size of %d is greater than the recommended batch size of %d. "+
|
||||||
|
"This might cause a long-running script", janitorBatchSize, defaultJanitorBatchSize)
|
||||||
|
}
|
||||||
janitor := newJanitor(janitorParams{
|
janitor := newJanitor(janitorParams{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
broker: rdb,
|
broker: rdb,
|
||||||
queues: qnames,
|
queues: qnames,
|
||||||
interval: 8 * time.Second,
|
interval: janitorInterval,
|
||||||
|
batchSize: janitorBatchSize,
|
||||||
})
|
})
|
||||||
aggregator := newAggregator(aggregatorParams{
|
aggregator := newAggregator(aggregatorParams{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
@@ -563,18 +605,19 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
groupAggregator: cfg.GroupAggregator,
|
groupAggregator: cfg.GroupAggregator,
|
||||||
})
|
})
|
||||||
return &Server{
|
return &Server{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
broker: rdb,
|
broker: rdb,
|
||||||
state: srvState,
|
sharedConnection: true,
|
||||||
forwarder: forwarder,
|
state: srvState,
|
||||||
processor: processor,
|
forwarder: forwarder,
|
||||||
syncer: syncer,
|
processor: processor,
|
||||||
heartbeater: heartbeater,
|
syncer: syncer,
|
||||||
subscriber: subscriber,
|
heartbeater: heartbeater,
|
||||||
recoverer: recoverer,
|
subscriber: subscriber,
|
||||||
healthchecker: healthchecker,
|
recoverer: recoverer,
|
||||||
janitor: janitor,
|
healthchecker: healthchecker,
|
||||||
aggregator: aggregator,
|
janitor: janitor,
|
||||||
|
aggregator: aggregator,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -702,7 +745,9 @@ func (srv *Server) Shutdown() {
|
|||||||
srv.heartbeater.shutdown()
|
srv.heartbeater.shutdown()
|
||||||
srv.wg.Wait()
|
srv.wg.Wait()
|
||||||
|
|
||||||
srv.broker.Close()
|
if !srv.sharedConnection {
|
||||||
|
srv.broker.Close()
|
||||||
|
}
|
||||||
srv.logger.Info("Exiting")
|
srv.logger.Info("Exiting")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -14,22 +14,12 @@ import (
|
|||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"github.com/hibiken/asynq/internal/testbroker"
|
"github.com/hibiken/asynq/internal/testbroker"
|
||||||
"github.com/hibiken/asynq/internal/testutil"
|
"github.com/hibiken/asynq/internal/testutil"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestServer(t *testing.T) {
|
func testServer(t *testing.T, c *Client, srv *Server) {
|
||||||
// https://github.com/go-redis/redis/issues/1029
|
|
||||||
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
|
||||||
defer goleak.VerifyNone(t, ignoreOpt)
|
|
||||||
|
|
||||||
redisConnOpt := getRedisConnOpt(t)
|
|
||||||
c := NewClient(redisConnOpt)
|
|
||||||
defer c.Close()
|
|
||||||
srv := NewServer(redisConnOpt, Config{
|
|
||||||
Concurrency: 10,
|
|
||||||
LogLevel: testLogLevel,
|
|
||||||
})
|
|
||||||
|
|
||||||
// no-op handler
|
// no-op handler
|
||||||
h := func(ctx context.Context, task *Task) error {
|
h := func(ctx context.Context, task *Task) error {
|
||||||
return nil
|
return nil
|
||||||
@@ -53,6 +43,43 @@ func TestServer(t *testing.T) {
|
|||||||
srv.Shutdown()
|
srv.Shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServer(t *testing.T) {
|
||||||
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
defer goleak.VerifyNone(t, ignoreOpt)
|
||||||
|
|
||||||
|
redisConnOpt := getRedisConnOpt(t)
|
||||||
|
c := NewClient(redisConnOpt)
|
||||||
|
defer c.Close()
|
||||||
|
srv := NewServer(redisConnOpt, Config{
|
||||||
|
Concurrency: 10,
|
||||||
|
LogLevel: testLogLevel,
|
||||||
|
})
|
||||||
|
|
||||||
|
testServer(t, c, srv)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerFromRedisClient(t *testing.T) {
|
||||||
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
defer goleak.VerifyNone(t, ignoreOpt)
|
||||||
|
|
||||||
|
redisConnOpt := getRedisConnOpt(t)
|
||||||
|
redisClient := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
|
||||||
|
c := NewClientFromRedisClient(redisClient)
|
||||||
|
srv := NewServerFromRedisClient(redisClient, Config{
|
||||||
|
Concurrency: 10,
|
||||||
|
LogLevel: testLogLevel,
|
||||||
|
})
|
||||||
|
|
||||||
|
testServer(t, c, srv)
|
||||||
|
|
||||||
|
err := c.Close()
|
||||||
|
if err == nil {
|
||||||
|
t.Error("client.Close() should have failed because of a shared client but it didn't")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServerRun(t *testing.T) {
|
func TestServerRun(t *testing.T) {
|
||||||
// https://github.com/go-redis/redis/issues/1029
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
//go:build linux || bsd || darwin
|
//go:build linux || dragonfly || freebsd || netbsd || openbsd || darwin
|
||||||
|
|
||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
|
@@ -25,7 +25,7 @@ To view details on any command, use `asynq help <command> <subcommand>`.
|
|||||||
- `asynq dash`
|
- `asynq dash`
|
||||||
- `asynq stats`
|
- `asynq stats`
|
||||||
- `asynq queue [ls inspect history rm pause unpause]`
|
- `asynq queue [ls inspect history rm pause unpause]`
|
||||||
- `asynq task [ls cancel delete archive run delete-all archive-all run-all]`
|
- `asynq task [ls cancel delete archive run deleteall archiveall runall]`
|
||||||
- `asynq server [ls]`
|
- `asynq server [ls]`
|
||||||
|
|
||||||
### Global flags
|
### Global flags
|
||||||
|
@@ -369,7 +369,7 @@ func createRDB() *rdb.RDB {
|
|||||||
return rdb.NewRDB(c)
|
return rdb.NewRDB(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// createRDB creates a Inspector instance using flag values and returns it.
|
// createInspector creates a Inspector instance using flag values and returns it.
|
||||||
func createInspector() *asynq.Inspector {
|
func createInspector() *asynq.Inspector {
|
||||||
return asynq.NewInspector(getRedisConnOpt())
|
return asynq.NewInspector(getRedisConnOpt())
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user