mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-20 07:40:19 +08:00
Merge a05cbd2e387186d3e12b93d939a9e6b7be8a6b5c into c327bc40a28e4db45195cfe082d88faa808ce87d
This commit is contained in:
commit
a1515888a8
@ -81,6 +81,11 @@ func newAggregator(params aggregatorParams) *aggregator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *aggregator) resetState() {
|
||||||
|
a.done = make(chan struct{})
|
||||||
|
a.sema = make(chan struct{}, maxConcurrentAggregationChecks)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *aggregator) shutdown() {
|
func (a *aggregator) shutdown() {
|
||||||
if a.ga == nil {
|
if a.ga == nil {
|
||||||
return
|
return
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
|
@ -14,12 +14,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
|
||||||
pb "github.com/hibiken/asynq/internal/proto"
|
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
|
pb "github.com/hibiken/asynq/internal/proto"
|
||||||
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Version of asynq library and CLI.
|
// Version of asynq library and CLI.
|
||||||
@ -722,4 +723,5 @@ type Broker interface {
|
|||||||
PublishCancelation(id string) error
|
PublishCancelation(id string) error
|
||||||
|
|
||||||
WriteResult(qname, id string, data []byte) (n int, err error)
|
WriteResult(qname, id string, data []byte) (n int, err error)
|
||||||
|
SetQueueConcurrency(qname string, concurrency int)
|
||||||
}
|
}
|
||||||
|
@ -13,11 +13,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
"github.com/spf13/cast"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/spf13/cast"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const statsTTL = 90 * 24 * time.Hour // 90 days
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||||
@ -25,19 +26,34 @@ const statsTTL = 90 * 24 * time.Hour // 90 days
|
|||||||
// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
|
// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
|
||||||
const LeaseDuration = 30 * time.Second
|
const LeaseDuration = 30 * time.Second
|
||||||
|
|
||||||
|
type Option func(r *RDB)
|
||||||
|
|
||||||
|
func WithQueueConcurrency(queueConcurrency map[string]int) Option {
|
||||||
|
return func(r *RDB) {
|
||||||
|
for qname, concurrency := range queueConcurrency {
|
||||||
|
r.queueConcurrency.Store(qname, concurrency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// RDB is a client interface to query and mutate task queues.
|
// RDB is a client interface to query and mutate task queues.
|
||||||
type RDB struct {
|
type RDB struct {
|
||||||
client redis.UniversalClient
|
client redis.UniversalClient
|
||||||
clock timeutil.Clock
|
clock timeutil.Clock
|
||||||
queuesPublished sync.Map
|
queuesPublished sync.Map
|
||||||
|
queueConcurrency sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRDB returns a new instance of RDB.
|
// NewRDB returns a new instance of RDB.
|
||||||
func NewRDB(client redis.UniversalClient) *RDB {
|
func NewRDB(client redis.UniversalClient, opts ...Option) *RDB {
|
||||||
return &RDB{
|
r := &RDB{
|
||||||
client: client,
|
client: client,
|
||||||
clock: timeutil.NewRealClock(),
|
clock: timeutil.NewRealClock(),
|
||||||
}
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(r)
|
||||||
|
}
|
||||||
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the connection with redis server.
|
// Close closes the connection with redis server.
|
||||||
@ -217,6 +233,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
|||||||
// --
|
// --
|
||||||
// ARGV[1] -> initial lease expiration Unix time
|
// ARGV[1] -> initial lease expiration Unix time
|
||||||
// ARGV[2] -> task key prefix
|
// ARGV[2] -> task key prefix
|
||||||
|
// ARGV[3] -> queue concurrency
|
||||||
//
|
//
|
||||||
// Output:
|
// Output:
|
||||||
// Returns nil if no processable task is found in the given queue.
|
// Returns nil if no processable task is found in the given queue.
|
||||||
@ -225,15 +242,20 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
|||||||
// Note: dequeueCmd checks whether a queue is paused first, before
|
// Note: dequeueCmd checks whether a queue is paused first, before
|
||||||
// calling RPOPLPUSH to pop a task from the queue.
|
// calling RPOPLPUSH to pop a task from the queue.
|
||||||
var dequeueCmd = redis.NewScript(`
|
var dequeueCmd = redis.NewScript(`
|
||||||
if redis.call("EXISTS", KEYS[2]) == 0 then
|
if redis.call("EXISTS", KEYS[2]) > 0 then
|
||||||
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
return nil
|
||||||
if id then
|
end
|
||||||
local key = ARGV[2] .. id
|
local count = redis.call("ZCARD", KEYS[4])
|
||||||
redis.call("HSET", key, "state", "active")
|
if (count >= tonumber(ARGV[3])) then
|
||||||
redis.call("HDEL", key, "pending_since")
|
return nil
|
||||||
redis.call("ZADD", KEYS[4], ARGV[1], id)
|
end
|
||||||
return redis.call("HGET", key, "msg")
|
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
||||||
end
|
if id then
|
||||||
|
local key = ARGV[2] .. id
|
||||||
|
redis.call("HSET", key, "state", "active")
|
||||||
|
redis.call("HDEL", key, "pending_since")
|
||||||
|
redis.call("ZADD", KEYS[4], ARGV[1], id)
|
||||||
|
return redis.call("HGET", key, "msg")
|
||||||
end
|
end
|
||||||
return nil`)
|
return nil`)
|
||||||
|
|
||||||
@ -251,9 +273,14 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT
|
|||||||
base.LeaseKey(qname),
|
base.LeaseKey(qname),
|
||||||
}
|
}
|
||||||
leaseExpirationTime = r.clock.Now().Add(LeaseDuration)
|
leaseExpirationTime = r.clock.Now().Add(LeaseDuration)
|
||||||
|
queueConcurrency, ok := r.queueConcurrency.Load(qname)
|
||||||
|
if !ok || queueConcurrency.(int) <= 0 {
|
||||||
|
queueConcurrency = math.MaxInt
|
||||||
|
}
|
||||||
argv := []interface{}{
|
argv := []interface{}{
|
||||||
leaseExpirationTime.Unix(),
|
leaseExpirationTime.Unix(),
|
||||||
base.TaskKeyPrefix(qname),
|
base.TaskKeyPrefix(qname),
|
||||||
|
queueConcurrency,
|
||||||
}
|
}
|
||||||
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
@ -1556,3 +1583,7 @@ func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
return len(data), nil
|
return len(data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RDB) SetQueueConcurrency(qname string, concurrency int) {
|
||||||
|
r.queueConcurrency.Store(qname, concurrency)
|
||||||
|
}
|
||||||
|
@ -18,11 +18,12 @@ import (
|
|||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/google/go-cmp/cmp/cmpopts"
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
h "github.com/hibiken/asynq/internal/testutil"
|
h "github.com/hibiken/asynq/internal/testutil"
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// variables used for package testing.
|
// variables used for package testing.
|
||||||
@ -384,6 +385,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
wantActive map[string][]*base.TaskMessage
|
wantActive map[string][]*base.TaskMessage
|
||||||
wantLease map[string][]base.Z
|
wantLease map[string][]base.Z
|
||||||
|
queueConcurrency map[string]int
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
pending: map[string][]*base.TaskMessage{
|
pending: map[string][]*base.TaskMessage{
|
||||||
@ -494,6 +496,92 @@ func TestDequeue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDequeueWithQueueConcurrency(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
now := time.Now()
|
||||||
|
r.SetClock(timeutil.NewSimulatedClock(now))
|
||||||
|
const taskNum = 3
|
||||||
|
msgs := make([]*base.TaskMessage, 0, taskNum)
|
||||||
|
for i := 0; i < taskNum; i++ {
|
||||||
|
msg := &base.TaskMessage{
|
||||||
|
ID: uuid.NewString(),
|
||||||
|
Type: "send_email",
|
||||||
|
Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
|
||||||
|
Queue: "default",
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
}
|
||||||
|
msgs = append(msgs, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
pending map[string][]*base.TaskMessage
|
||||||
|
qnames []string // list of queues to query
|
||||||
|
queueConcurrency map[string]int
|
||||||
|
wantMsgs []*base.TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "without queue concurrency control",
|
||||||
|
pending: map[string][]*base.TaskMessage{
|
||||||
|
"default": msgs,
|
||||||
|
},
|
||||||
|
qnames: []string{"default"},
|
||||||
|
wantMsgs: msgs,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with queue concurrency control",
|
||||||
|
pending: map[string][]*base.TaskMessage{
|
||||||
|
"default": msgs,
|
||||||
|
},
|
||||||
|
qnames: []string{"default"},
|
||||||
|
queueConcurrency: map[string]int{"default": 2},
|
||||||
|
wantMsgs: msgs[:2],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with queue concurrency zero",
|
||||||
|
pending: map[string][]*base.TaskMessage{
|
||||||
|
"default": msgs,
|
||||||
|
},
|
||||||
|
qnames: []string{"default"},
|
||||||
|
queueConcurrency: map[string]int{"default": 0},
|
||||||
|
wantMsgs: msgs,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||||
|
r.queueConcurrency.Range(func(key, value interface{}) bool {
|
||||||
|
r.queueConcurrency.Delete(key)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
for queue, n := range tc.queueConcurrency {
|
||||||
|
r.queueConcurrency.Store(queue, n)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotMsgs := make([]*base.TaskMessage, 0, len(msgs))
|
||||||
|
for i := 0; i < len(msgs); i++ {
|
||||||
|
msg, _, err := r.Dequeue(tc.qnames...)
|
||||||
|
if errors.Is(err, errors.ErrNoProcessableTask) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.qnames, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
gotMsgs = append(gotMsgs, msg)
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(tc.wantMsgs, gotMsgs, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
|
||||||
|
tc.qnames, gotMsgs, tc.wantMsgs)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDequeueError(t *testing.T) {
|
func TestDequeueError(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
@ -11,8 +11,9 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
|
||||||
"github.com/redis/go-redis/v9"
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/base"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errRedisDown = errors.New("testutil: redis is down")
|
var errRedisDown = errors.New("testutil: redis is down")
|
||||||
@ -297,3 +298,7 @@ func (tb *TestBroker) ReclaimStaleAggregationSets(qname string) error {
|
|||||||
}
|
}
|
||||||
return tb.real.ReclaimStaleAggregationSets(qname)
|
return tb.real.ReclaimStaleAggregationSets(qname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tb *TestBroker) SetQueueConcurrency(qname string, concurrency int) {
|
||||||
|
tb.real.SetQueueConcurrency(qname, concurrency)
|
||||||
|
}
|
||||||
|
18
processor.go
18
processor.go
@ -16,12 +16,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
asynqcontext "github.com/hibiken/asynq/internal/context"
|
asynqcontext "github.com/hibiken/asynq/internal/context"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type processor struct {
|
type processor struct {
|
||||||
@ -57,7 +58,7 @@ type processor struct {
|
|||||||
// channel to communicate back to the long running "processor" goroutine.
|
// channel to communicate back to the long running "processor" goroutine.
|
||||||
// once is used to send value to the channel only once.
|
// once is used to send value to the channel only once.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
once sync.Once
|
once *sync.Once
|
||||||
|
|
||||||
// quit channel is closed when the shutdown of the "processor" goroutine starts.
|
// quit channel is closed when the shutdown of the "processor" goroutine starts.
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
@ -112,6 +113,7 @@ func newProcessor(params processorParams) *processor {
|
|||||||
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
||||||
sema: make(chan struct{}, params.concurrency),
|
sema: make(chan struct{}, params.concurrency),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
once: &sync.Once{},
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
abort: make(chan struct{}),
|
abort: make(chan struct{}),
|
||||||
errHandler: params.errHandler,
|
errHandler: params.errHandler,
|
||||||
@ -139,7 +141,9 @@ func (p *processor) stop() {
|
|||||||
func (p *processor) shutdown() {
|
func (p *processor) shutdown() {
|
||||||
p.stop()
|
p.stop()
|
||||||
|
|
||||||
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
|
go func(abort chan struct{}) {
|
||||||
|
time.AfterFunc(p.shutdownTimeout, func() { close(abort) })
|
||||||
|
}(p.abort)
|
||||||
|
|
||||||
p.logger.Info("Waiting for all workers to finish...")
|
p.logger.Info("Waiting for all workers to finish...")
|
||||||
// block until all workers have released the token
|
// block until all workers have released the token
|
||||||
@ -149,6 +153,14 @@ func (p *processor) shutdown() {
|
|||||||
p.logger.Info("All workers have finished")
|
p.logger.Info("All workers have finished")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *processor) resetState() {
|
||||||
|
p.sema = make(chan struct{}, cap(p.sema))
|
||||||
|
p.done = make(chan struct{})
|
||||||
|
p.quit = make(chan struct{})
|
||||||
|
p.abort = make(chan struct{})
|
||||||
|
p.once = &sync.Once{}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *processor) start(wg *sync.WaitGroup) {
|
func (p *processor) start(wg *sync.WaitGroup) {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
96
server.go
96
server.go
@ -15,10 +15,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Server is responsible for task processing and task lifecycle management.
|
// Server is responsible for task processing and task lifecycle management.
|
||||||
@ -43,6 +44,10 @@ type Server struct {
|
|||||||
|
|
||||||
state *serverState
|
state *serverState
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
queues map[string]int
|
||||||
|
strictPriority bool
|
||||||
|
|
||||||
// wait group to wait for all goroutines to finish.
|
// wait group to wait for all goroutines to finish.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
forwarder *forwarder
|
forwarder *forwarder
|
||||||
@ -252,6 +257,11 @@ type Config struct {
|
|||||||
// If unset or zero, default batch size of 100 is used.
|
// If unset or zero, default batch size of 100 is used.
|
||||||
// Make sure to not put a big number as the batch size to prevent a long-running script.
|
// Make sure to not put a big number as the batch size to prevent a long-running script.
|
||||||
JanitorBatchSize int
|
JanitorBatchSize int
|
||||||
|
|
||||||
|
// Maximum number of concurrent tasks of a queue.
|
||||||
|
//
|
||||||
|
// If set to a zero or not set, NewServer will not limit concurrency of the queue.
|
||||||
|
QueueConcurrency map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
|
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
|
||||||
@ -474,7 +484,9 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(queues) == 0 {
|
if len(queues) == 0 {
|
||||||
queues = defaultQueueConfig
|
for qname, p := range defaultQueueConfig {
|
||||||
|
queues[qname] = p
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var qnames []string
|
var qnames []string
|
||||||
for q := range queues {
|
for q := range queues {
|
||||||
@ -503,7 +515,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
|
|||||||
}
|
}
|
||||||
logger.SetLevel(toInternalLogLevel(loglevel))
|
logger.SetLevel(toInternalLogLevel(loglevel))
|
||||||
|
|
||||||
rdb := rdb.NewRDB(c)
|
rdb := rdb.NewRDB(c, rdb.WithQueueConcurrency(cfg.QueueConcurrency))
|
||||||
starting := make(chan *workerInfo)
|
starting := make(chan *workerInfo)
|
||||||
finished := make(chan *base.TaskMessage)
|
finished := make(chan *base.TaskMessage)
|
||||||
syncCh := make(chan *syncRequest)
|
syncCh := make(chan *syncRequest)
|
||||||
@ -603,6 +615,8 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
|
|||||||
groupAggregator: cfg.GroupAggregator,
|
groupAggregator: cfg.GroupAggregator,
|
||||||
})
|
})
|
||||||
return &Server{
|
return &Server{
|
||||||
|
queues: queues,
|
||||||
|
strictPriority: cfg.StrictPriority,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
broker: rdb,
|
broker: rdb,
|
||||||
sharedConnection: true,
|
sharedConnection: true,
|
||||||
@ -785,3 +799,79 @@ func (srv *Server) Ping() error {
|
|||||||
|
|
||||||
return srv.broker.Ping()
|
return srv.broker.Ping()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (srv *Server) AddQueue(qname string, priority, concurrency int) {
|
||||||
|
srv.mu.Lock()
|
||||||
|
defer srv.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := srv.queues[qname]; ok {
|
||||||
|
srv.logger.Warnf("queue %s already exists, skipping", qname)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
srv.state.mu.Lock()
|
||||||
|
state := srv.state.value
|
||||||
|
if state == srvStateNew || state == srvStateClosed {
|
||||||
|
srv.queues[qname] = priority
|
||||||
|
srv.state.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
srv.state.mu.Unlock()
|
||||||
|
|
||||||
|
srv.logger.Info("restart server...")
|
||||||
|
srv.forwarder.shutdown()
|
||||||
|
srv.processor.shutdown()
|
||||||
|
srv.recoverer.shutdown()
|
||||||
|
srv.syncer.shutdown()
|
||||||
|
srv.subscriber.shutdown()
|
||||||
|
srv.janitor.shutdown()
|
||||||
|
srv.aggregator.shutdown()
|
||||||
|
srv.healthchecker.shutdown()
|
||||||
|
srv.heartbeater.shutdown()
|
||||||
|
srv.wg.Wait()
|
||||||
|
|
||||||
|
srv.queues[qname] = priority
|
||||||
|
|
||||||
|
qnames := make([]string, 0, len(srv.queues))
|
||||||
|
for q := range srv.queues {
|
||||||
|
qnames = append(qnames, q)
|
||||||
|
}
|
||||||
|
srv.broker.SetQueueConcurrency(qname, concurrency)
|
||||||
|
srv.heartbeater.queues = srv.queues
|
||||||
|
srv.recoverer.queues = qnames
|
||||||
|
srv.forwarder.queues = qnames
|
||||||
|
srv.processor.resetState()
|
||||||
|
queues := normalizeQueues(srv.queues)
|
||||||
|
orderedQueues := []string(nil)
|
||||||
|
if srv.strictPriority {
|
||||||
|
orderedQueues = sortByPriority(queues)
|
||||||
|
}
|
||||||
|
srv.processor.queueConfig = srv.queues
|
||||||
|
srv.processor.orderedQueues = orderedQueues
|
||||||
|
srv.janitor.queues = qnames
|
||||||
|
srv.aggregator.resetState()
|
||||||
|
srv.aggregator.queues = qnames
|
||||||
|
|
||||||
|
srv.heartbeater.start(&srv.wg)
|
||||||
|
srv.healthchecker.start(&srv.wg)
|
||||||
|
srv.subscriber.start(&srv.wg)
|
||||||
|
srv.syncer.start(&srv.wg)
|
||||||
|
srv.recoverer.start(&srv.wg)
|
||||||
|
srv.forwarder.start(&srv.wg)
|
||||||
|
srv.processor.start(&srv.wg)
|
||||||
|
srv.janitor.start(&srv.wg)
|
||||||
|
srv.aggregator.start(&srv.wg)
|
||||||
|
|
||||||
|
srv.logger.Info("server restarted")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) HasQueue(qname string) bool {
|
||||||
|
srv.mu.RLock()
|
||||||
|
defer srv.mu.RUnlock()
|
||||||
|
_, ok := srv.queues[qname]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) SetQueueConcurrency(queue string, concurrency int) {
|
||||||
|
srv.broker.SetQueueConcurrency(queue, concurrency)
|
||||||
|
}
|
||||||
|
193
server_test.go
193
server_test.go
@ -11,6 +11,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"github.com/hibiken/asynq/internal/testbroker"
|
"github.com/hibiken/asynq/internal/testbroker"
|
||||||
"github.com/hibiken/asynq/internal/testutil"
|
"github.com/hibiken/asynq/internal/testutil"
|
||||||
@ -80,6 +81,198 @@ func TestServerFromRedisClient(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerWithQueueConcurrency(t *testing.T) {
|
||||||
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
defer goleak.VerifyNone(t, ignoreOpt)
|
||||||
|
|
||||||
|
redisConnOpt := getRedisConnOpt(t)
|
||||||
|
r, ok := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("asynq: unsupported RedisConnOpt type %T", r)
|
||||||
|
}
|
||||||
|
|
||||||
|
const taskNum = 8
|
||||||
|
const serverNum = 2
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
concurrency int
|
||||||
|
queueConcurrency int
|
||||||
|
wantActiveNum int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "based on client concurrency control",
|
||||||
|
concurrency: 2,
|
||||||
|
queueConcurrency: 6,
|
||||||
|
wantActiveNum: 2 * serverNum,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no queue concurrency control",
|
||||||
|
concurrency: 2,
|
||||||
|
queueConcurrency: 0,
|
||||||
|
wantActiveNum: 2 * serverNum,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "based on queue concurrency control",
|
||||||
|
concurrency: 6,
|
||||||
|
queueConcurrency: 2,
|
||||||
|
wantActiveNum: 2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// no-op handler
|
||||||
|
handle := func(ctx context.Context, task *Task) error {
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var servers [serverNum]*Server
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
testutil.FlushDB(t, r)
|
||||||
|
c := NewClient(redisConnOpt)
|
||||||
|
defer c.Close()
|
||||||
|
for i := 0; i < taskNum; i++ {
|
||||||
|
_, err = c.Enqueue(NewTask("send_email",
|
||||||
|
testutil.JSON(map[string]interface{}{"recipient_id": i + 123})))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not enqueue a task: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < serverNum; i++ {
|
||||||
|
srv := NewServer(redisConnOpt, Config{
|
||||||
|
Concurrency: tc.concurrency,
|
||||||
|
LogLevel: testLogLevel,
|
||||||
|
QueueConcurrency: map[string]int{base.DefaultQueueName: tc.queueConcurrency},
|
||||||
|
})
|
||||||
|
err = srv.Start(HandlerFunc(handle))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
servers[i] = srv
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
for _, srv := range servers {
|
||||||
|
srv.Shutdown()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
inspector := NewInspector(redisConnOpt)
|
||||||
|
tasks, err := inspector.ListActiveTasks(base.DefaultQueueName)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not list active tasks: %v", err)
|
||||||
|
}
|
||||||
|
if len(tasks) != tc.wantActiveNum {
|
||||||
|
t.Errorf("default queue has %d active tasks, want %d", len(tasks), tc.wantActiveNum)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerWithDynamicQueue(t *testing.T) {
|
||||||
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
defer goleak.VerifyNone(t, ignoreOpt)
|
||||||
|
|
||||||
|
redisConnOpt := getRedisConnOpt(t)
|
||||||
|
r, ok := redisConnOpt.MakeRedisClient().(redis.UniversalClient)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("asynq: unsupported RedisConnOpt type %T", r)
|
||||||
|
}
|
||||||
|
|
||||||
|
const taskNum = 8
|
||||||
|
const serverNum = 2
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
concurrency int
|
||||||
|
queueConcurrency int
|
||||||
|
wantActiveNum int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "based on client concurrency control",
|
||||||
|
concurrency: 2,
|
||||||
|
queueConcurrency: 6,
|
||||||
|
wantActiveNum: 2 * serverNum,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no queue concurrency control",
|
||||||
|
concurrency: 2,
|
||||||
|
queueConcurrency: 0,
|
||||||
|
wantActiveNum: 2 * serverNum,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "based on queue concurrency control",
|
||||||
|
concurrency: 6,
|
||||||
|
queueConcurrency: 2,
|
||||||
|
wantActiveNum: 2 * serverNum,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// no-op handler
|
||||||
|
handle := func(ctx context.Context, task *Task) error {
|
||||||
|
time.Sleep(time.Second * 2)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var DynamicQueueNameFmt = "dynamic:%d:%d"
|
||||||
|
var servers [serverNum]*Server
|
||||||
|
for tcn, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
var err error
|
||||||
|
testutil.FlushDB(t, r)
|
||||||
|
c := NewClient(redisConnOpt)
|
||||||
|
defer c.Close()
|
||||||
|
for i := 0; i < taskNum; i++ {
|
||||||
|
_, err = c.Enqueue(NewTask("send_email",
|
||||||
|
testutil.JSON(map[string]interface{}{"recipient_id": i + 123})),
|
||||||
|
Queue(fmt.Sprintf(DynamicQueueNameFmt, tcn, i%2)))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not enqueue a task: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < serverNum; i++ {
|
||||||
|
srv := NewServer(redisConnOpt, Config{
|
||||||
|
Concurrency: tc.concurrency,
|
||||||
|
LogLevel: testLogLevel,
|
||||||
|
QueueConcurrency: map[string]int{base.DefaultQueueName: tc.queueConcurrency},
|
||||||
|
})
|
||||||
|
err = srv.Start(HandlerFunc(handle))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
srv.AddQueue(fmt.Sprintf(DynamicQueueNameFmt, tcn, i), 1, tc.queueConcurrency)
|
||||||
|
servers[i] = srv
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
for _, srv := range servers {
|
||||||
|
srv.Shutdown()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
inspector := NewInspector(redisConnOpt)
|
||||||
|
|
||||||
|
var tasks []*TaskInfo
|
||||||
|
|
||||||
|
for i := range servers {
|
||||||
|
qtasks, err := inspector.ListActiveTasks(fmt.Sprintf(DynamicQueueNameFmt, tcn, i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("could not list active tasks: %v", err)
|
||||||
|
}
|
||||||
|
tasks = append(tasks, qtasks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tasks) != tc.wantActiveNum {
|
||||||
|
t.Errorf("dynamic queue has %d active tasks, want %d", len(tasks), tc.wantActiveNum)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestServerRun(t *testing.T) {
|
func TestServerRun(t *testing.T) {
|
||||||
// https://github.com/go-redis/redis/issues/1029
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
|
||||||
|
@ -80,6 +80,9 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
|
|||||||
s.logger.Debug("Subscriber done")
|
s.logger.Debug("Subscriber done")
|
||||||
return
|
return
|
||||||
case msg := <-cancelCh:
|
case msg := <-cancelCh:
|
||||||
|
if msg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
cancel, ok := s.cancelations.Get(msg.Payload)
|
cancel, ok := s.cancelations.Get(msg.Payload)
|
||||||
if ok {
|
if ok {
|
||||||
cancel()
|
cancel()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user