2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-18 23:00:20 +08:00

Merge 6dad2eb6d7c39a9e912c4914a24c67ee660b24c5 into c327bc40a28e4db45195cfe082d88faa808ce87d

This commit is contained in:
asdfhuang 2025-04-01 14:50:15 -03:00 committed by GitHub
commit 3294352a89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 99 additions and 32 deletions

25
broker/broker.go Normal file
View File

@ -0,0 +1,25 @@
package broker
import (
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
// This package exports the same types as the internal package.
// This is a temporary solution until we can move the these types out of internal.
type (
TaskMessage = base.TaskMessage
WorkerInfo = base.WorkerInfo
ServerInfo = base.ServerInfo
Broker = base.Broker
CancellationSubscription = base.CancellationSubscription
RDB = rdb.RDB
)
var (
NewRDB = rdb.NewRDB
)

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/broker"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
@ -44,7 +45,13 @@ func NewClient(r RedisConnOpt) *Client {
// NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewClientFromRedisClient(c redis.UniversalClient) *Client {
return &Client{broker: rdb.NewRDB(c), sharedConnection: true}
return NewClientFromBroker(rdb.NewRDB(c))
}
// NewClientFromBroker returns a new instance of Client given a broker.
// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it.
func NewClientFromBroker(b broker.Broker) *Client {
return &Client{broker: b, sharedConnection: true}
}
type OptionType int

View File

@ -17,7 +17,6 @@ import (
"github.com/hibiken/asynq/internal/errors"
pb "github.com/hibiken/asynq/internal/proto"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
@ -718,8 +717,14 @@ type Broker interface {
ClearServerState(host string, pid int, serverID string) error
// Cancelation related methods
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
SubscribeCancellation() (CancellationSubscription, error)
PublishCancelation(id string) error
WriteResult(qname, id string, data []byte) (n int, err error)
}
// CancellationSubscription is a subscription to cancellation messages.
type CancellationSubscription interface {
Channel() <-chan string // returns a channel to receive the id of tasks to be cancelled.
Close() error // closes the subscription.
}

View File

@ -32,6 +32,8 @@ type RDB struct {
queuesPublished sync.Map
}
var _ base.Broker = &RDB{}
// NewRDB returns a new instance of RDB.
func NewRDB(client redis.UniversalClient) *RDB {
return &RDB{
@ -1481,8 +1483,31 @@ func (r *RDB) ClearSchedulerEntries(schedulerID string) error {
return nil
}
// CancelationPubSub returns a pubsub for cancelation messages.
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
// cancelationSubscription is a wrapper for redis pubsub.
type cancellationSubscription struct {
pubsub *redis.PubSub
}
func (c *cancellationSubscription) Channel() <-chan string {
channelSize := 100 // same as redis defaults
ch := make(chan string, channelSize)
go func() {
for msg := range c.pubsub.Channel(redis.WithChannelSize(channelSize)) {
ch <- msg.Payload
}
close(ch)
}()
return ch
}
func (c *cancellationSubscription) Close() error {
return c.pubsub.Close()
}
// SubscribeCancellation returns a subscription for cancelation messages.
func (r *RDB) SubscribeCancellation() (base.CancellationSubscription, error) {
var op errors.Op = "rdb.CancelationPubSub"
ctx := context.Background()
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
@ -1490,7 +1515,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
if err != nil {
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
}
return pubsub, nil
return &cancellationSubscription{pubsub: pubsub}, nil
}
// PublishCancelation publish cancelation message to all subscribers.

View File

@ -3236,12 +3236,12 @@ func TestCancelationPubSub(t *testing.T) {
r := setup(t)
defer r.Close()
pubsub, err := r.CancelationPubSub()
sub, err := r.SubscribeCancellation()
if err != nil {
t.Fatalf("(*RDB).CancelationPubSub() returned an error: %v", err)
}
cancelCh := pubsub.Channel()
cancelCh := sub.Channel()
var (
mu sync.Mutex
@ -3249,9 +3249,9 @@ func TestCancelationPubSub(t *testing.T) {
)
go func() {
for msg := range cancelCh {
for id := range cancelCh {
mu.Lock()
received = append(received, msg.Payload)
received = append(received, id)
mu.Unlock()
}
}()
@ -3265,7 +3265,7 @@ func TestCancelationPubSub(t *testing.T) {
// allow for message to reach subscribers.
time.Sleep(time.Second)
pubsub.Close()
sub.Close()
mu.Lock()
if diff := cmp.Diff(publish, received, h.SortStringSliceOpt); diff != "" {

View File

@ -12,7 +12,6 @@ import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
)
var errRedisDown = errors.New("testutil: redis is down")
@ -190,13 +189,13 @@ func (tb *TestBroker) ClearServerState(host string, pid int, serverID string) er
return tb.real.ClearServerState(host, pid, serverID)
}
func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
func (tb *TestBroker) SubscribeCancellation() (base.CancellationSubscription, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
}
return tb.real.CancelationPubSub()
return tb.real.SubscribeCancellation()
}
func (tb *TestBroker) PublishCancelation(id string) error {

View File

@ -15,6 +15,7 @@ import (
"sync"
"time"
"github.com/hibiken/asynq/broker"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
@ -442,6 +443,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
// and server configuration
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
rdb := rdb.NewRDB(c)
return NewServerFromBroker(rdb, cfg)
}
// NewServerFromBroker returns a new instance of Server given a Broker and server configuration.
// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it.
func NewServerFromBroker(b broker.Broker, cfg Config) *Server {
baseCtxFn := cfg.BaseContext
if baseCtxFn == nil {
baseCtxFn = context.Background
@ -503,7 +511,6 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
logger.SetLevel(toInternalLogLevel(loglevel))
rdb := rdb.NewRDB(c)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
@ -517,7 +524,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
heartbeater := newHeartbeater(heartbeaterParams{
logger: logger,
broker: rdb,
broker: b,
interval: 5 * time.Second,
concurrency: n,
queues: queues,
@ -532,18 +539,18 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
forwarder := newForwarder(forwarderParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
interval: delayedTaskCheckInterval,
})
subscriber := newSubscriber(subscriberParams{
logger: logger,
broker: rdb,
broker: b,
cancelations: cancels,
})
processor := newProcessor(processorParams{
logger: logger,
broker: rdb,
broker: b,
retryDelayFunc: delayFunc,
taskCheckInterval: taskCheckInterval,
baseCtxFn: baseCtxFn,
@ -560,7 +567,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
recoverer := newRecoverer(recovererParams{
logger: logger,
broker: rdb,
broker: b,
retryDelayFunc: delayFunc,
isFailureFunc: isFailureFunc,
queues: qnames,
@ -568,7 +575,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
healthchecker := newHealthChecker(healthcheckerParams{
logger: logger,
broker: rdb,
broker: b,
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})
@ -588,14 +595,14 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
}
janitor := newJanitor(janitorParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
interval: janitorInterval,
batchSize: janitorBatchSize,
})
aggregator := newAggregator(aggregatorParams{
logger: logger,
broker: rdb,
broker: b,
queues: qnames,
gracePeriod: groupGracePeriod,
maxDelay: cfg.GroupMaxDelay,
@ -604,7 +611,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
})
return &Server{
logger: logger,
broker: rdb,
broker: b,
sharedConnection: true,
state: srvState,
forwarder: forwarder,

View File

@ -8,7 +8,6 @@ import (
"sync"
"time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
@ -54,12 +53,12 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
go func() {
defer wg.Done()
var (
pubsub *redis.PubSub
err error
sub base.CancellationSubscription
err error
)
// Try until successfully connect to Redis.
for {
pubsub, err = s.broker.CancelationPubSub()
sub, err = s.broker.SubscribeCancellation()
if err != nil {
s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
select {
@ -72,15 +71,15 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
}
break
}
cancelCh := pubsub.Channel()
cancelCh := sub.Channel()
for {
select {
case <-s.done:
pubsub.Close()
sub.Close()
s.logger.Debug("Subscriber done")
return
case msg := <-cancelCh:
cancel, ok := s.cancelations.Get(msg.Payload)
case id := <-cancelCh:
cancel, ok := s.cancelations.Get(id)
if ok {
cancel()
}