diff --git a/broker/broker.go b/broker/broker.go new file mode 100644 index 0000000..32d9ed3 --- /dev/null +++ b/broker/broker.go @@ -0,0 +1,25 @@ +package broker + +import ( + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" +) + +// This package exports the same types as the internal package. +// This is a temporary solution until we can move the these types out of internal. + +type ( + TaskMessage = base.TaskMessage + WorkerInfo = base.WorkerInfo + ServerInfo = base.ServerInfo + + Broker = base.Broker + + CancellationSubscription = base.CancellationSubscription + + RDB = rdb.RDB +) + +var ( + NewRDB = rdb.NewRDB +) diff --git a/client.go b/client.go index 4a0ba39..fcf776e 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "time" "github.com/google/uuid" + "github.com/hibiken/asynq/broker" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" @@ -44,7 +45,13 @@ func NewClient(r RedisConnOpt) *Client { // NewClientFromRedisClient returns a new instance of Client given a redis.UniversalClient // Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it. func NewClientFromRedisClient(c redis.UniversalClient) *Client { - return &Client{broker: rdb.NewRDB(c), sharedConnection: true} + return NewClientFromBroker(rdb.NewRDB(c)) +} + +// NewClientFromBroker returns a new instance of Client given a broker. +// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it. +func NewClientFromBroker(b broker.Broker) *Client { + return &Client{broker: b, sharedConnection: true} } type OptionType int diff --git a/internal/base/base.go b/internal/base/base.go index 90e635e..b88f18a 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -17,7 +17,6 @@ import ( "github.com/hibiken/asynq/internal/errors" pb "github.com/hibiken/asynq/internal/proto" "github.com/hibiken/asynq/internal/timeutil" - "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -718,8 +717,14 @@ type Broker interface { ClearServerState(host string, pid int, serverID string) error // Cancelation related methods - CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers + SubscribeCancellation() (CancellationSubscription, error) PublishCancelation(id string) error WriteResult(qname, id string, data []byte) (n int, err error) } + +// CancellationSubscription is a subscription to cancellation messages. +type CancellationSubscription interface { + Channel() <-chan string // returns a channel to receive the id of tasks to be cancelled. + Close() error // closes the subscription. +} diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 22df506..90e47e4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -32,6 +32,8 @@ type RDB struct { queuesPublished sync.Map } +var _ base.Broker = &RDB{} + // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { return &RDB{ @@ -1481,8 +1483,31 @@ func (r *RDB) ClearSchedulerEntries(schedulerID string) error { return nil } -// CancelationPubSub returns a pubsub for cancelation messages. -func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { +// cancelationSubscription is a wrapper for redis pubsub. +type cancellationSubscription struct { + pubsub *redis.PubSub +} + +func (c *cancellationSubscription) Channel() <-chan string { + channelSize := 100 // same as redis defaults + ch := make(chan string, channelSize) + + go func() { + for msg := range c.pubsub.Channel(redis.WithChannelSize(channelSize)) { + ch <- msg.Payload + } + close(ch) + }() + + return ch +} + +func (c *cancellationSubscription) Close() error { + return c.pubsub.Close() +} + +// SubscribeCancellation returns a subscription for cancelation messages. +func (r *RDB) SubscribeCancellation() (base.CancellationSubscription, error) { var op errors.Op = "rdb.CancelationPubSub" ctx := context.Background() pubsub := r.client.Subscribe(ctx, base.CancelChannel) @@ -1490,7 +1515,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { if err != nil { return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err)) } - return pubsub, nil + return &cancellationSubscription{pubsub: pubsub}, nil } // PublishCancelation publish cancelation message to all subscribers. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5249a29..2c194bd 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3236,12 +3236,12 @@ func TestCancelationPubSub(t *testing.T) { r := setup(t) defer r.Close() - pubsub, err := r.CancelationPubSub() + sub, err := r.SubscribeCancellation() if err != nil { t.Fatalf("(*RDB).CancelationPubSub() returned an error: %v", err) } - cancelCh := pubsub.Channel() + cancelCh := sub.Channel() var ( mu sync.Mutex @@ -3249,9 +3249,9 @@ func TestCancelationPubSub(t *testing.T) { ) go func() { - for msg := range cancelCh { + for id := range cancelCh { mu.Lock() - received = append(received, msg.Payload) + received = append(received, id) mu.Unlock() } }() @@ -3265,7 +3265,7 @@ func TestCancelationPubSub(t *testing.T) { // allow for message to reach subscribers. time.Sleep(time.Second) - pubsub.Close() + sub.Close() mu.Lock() if diff := cmp.Diff(publish, received, h.SortStringSliceOpt); diff != "" { diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index ffab6fe..48f657b 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -12,7 +12,6 @@ import ( "time" "github.com/hibiken/asynq/internal/base" - "github.com/redis/go-redis/v9" ) var errRedisDown = errors.New("testutil: redis is down") @@ -190,13 +189,13 @@ func (tb *TestBroker) ClearServerState(host string, pid int, serverID string) er return tb.real.ClearServerState(host, pid, serverID) } -func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { +func (tb *TestBroker) SubscribeCancellation() (base.CancellationSubscription, error) { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return nil, errRedisDown } - return tb.real.CancelationPubSub() + return tb.real.SubscribeCancellation() } func (tb *TestBroker) PublishCancelation(id string) error { diff --git a/server.go b/server.go index cde6516..4d5305f 100644 --- a/server.go +++ b/server.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/hibiken/asynq/broker" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" @@ -442,6 +443,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { // and server configuration // Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it. func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { + rdb := rdb.NewRDB(c) + return NewServerFromBroker(rdb, cfg) +} + +// NewServerFromBroker returns a new instance of Server given a Broker and server configuration. +// Warning: The underlying broker will not be closed by Asynq, you are responsible for closing it. +func NewServerFromBroker(b broker.Broker, cfg Config) *Server { baseCtxFn := cfg.BaseContext if baseCtxFn == nil { baseCtxFn = context.Background @@ -503,7 +511,6 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) - rdb := rdb.NewRDB(c) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) @@ -517,7 +524,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { }) heartbeater := newHeartbeater(heartbeaterParams{ logger: logger, - broker: rdb, + broker: b, interval: 5 * time.Second, concurrency: n, queues: queues, @@ -532,18 +539,18 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } forwarder := newForwarder(forwarderParams{ logger: logger, - broker: rdb, + broker: b, queues: qnames, interval: delayedTaskCheckInterval, }) subscriber := newSubscriber(subscriberParams{ logger: logger, - broker: rdb, + broker: b, cancelations: cancels, }) processor := newProcessor(processorParams{ logger: logger, - broker: rdb, + broker: b, retryDelayFunc: delayFunc, taskCheckInterval: taskCheckInterval, baseCtxFn: baseCtxFn, @@ -560,7 +567,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { }) recoverer := newRecoverer(recovererParams{ logger: logger, - broker: rdb, + broker: b, retryDelayFunc: delayFunc, isFailureFunc: isFailureFunc, queues: qnames, @@ -568,7 +575,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { }) healthchecker := newHealthChecker(healthcheckerParams{ logger: logger, - broker: rdb, + broker: b, interval: healthcheckInterval, healthcheckFunc: cfg.HealthCheckFunc, }) @@ -588,14 +595,14 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { } janitor := newJanitor(janitorParams{ logger: logger, - broker: rdb, + broker: b, queues: qnames, interval: janitorInterval, batchSize: janitorBatchSize, }) aggregator := newAggregator(aggregatorParams{ logger: logger, - broker: rdb, + broker: b, queues: qnames, gracePeriod: groupGracePeriod, maxDelay: cfg.GroupMaxDelay, @@ -604,7 +611,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server { }) return &Server{ logger: logger, - broker: rdb, + broker: b, sharedConnection: true, state: srvState, forwarder: forwarder, diff --git a/subscriber.go b/subscriber.go index 8fc4eac..e303b6a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" ) @@ -54,12 +53,12 @@ func (s *subscriber) start(wg *sync.WaitGroup) { go func() { defer wg.Done() var ( - pubsub *redis.PubSub - err error + sub base.CancellationSubscription + err error ) // Try until successfully connect to Redis. for { - pubsub, err = s.broker.CancelationPubSub() + sub, err = s.broker.SubscribeCancellation() if err != nil { s.logger.Errorf("cannot subscribe to cancelation channel: %v", err) select { @@ -72,15 +71,15 @@ func (s *subscriber) start(wg *sync.WaitGroup) { } break } - cancelCh := pubsub.Channel() + cancelCh := sub.Channel() for { select { case <-s.done: - pubsub.Close() + sub.Close() s.logger.Debug("Subscriber done") return - case msg := <-cancelCh: - cancel, ok := s.cancelations.Get(msg.Payload) + case id := <-cancelCh: + cancel, ok := s.cancelations.Get(id) if ok { cancel() }