mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-22 22:06:12 +08:00
feat: allow custom broker
This commit is contained in:
@@ -17,7 +17,6 @@ import (
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
pb "github.com/hibiken/asynq/internal/proto"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
@@ -718,8 +717,14 @@ type Broker interface {
|
||||
ClearServerState(host string, pid int, serverID string) error
|
||||
|
||||
// Cancelation related methods
|
||||
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
|
||||
SubscribeCancellation() (CancellationSubscription, error)
|
||||
PublishCancelation(id string) error
|
||||
|
||||
WriteResult(qname, id string, data []byte) (n int, err error)
|
||||
}
|
||||
|
||||
// CancellationSubscription is a subscription to cancellation messages.
|
||||
type CancellationSubscription interface {
|
||||
Channel() <-chan string // returns a channel to receive the id of tasks to be cancelled.
|
||||
Close() error // closes the subscription.
|
||||
}
|
||||
|
@@ -32,6 +32,8 @@ type RDB struct {
|
||||
queuesPublished sync.Map
|
||||
}
|
||||
|
||||
var _ base.Broker = &RDB{}
|
||||
|
||||
// NewRDB returns a new instance of RDB.
|
||||
func NewRDB(client redis.UniversalClient) *RDB {
|
||||
return &RDB{
|
||||
@@ -1481,8 +1483,31 @@ func (r *RDB) ClearSchedulerEntries(schedulerID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelationPubSub returns a pubsub for cancelation messages.
|
||||
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
||||
// cancelationSubscription is a wrapper for redis pubsub.
|
||||
type cancellationSubscription struct {
|
||||
pubsub *redis.PubSub
|
||||
}
|
||||
|
||||
func (c *cancellationSubscription) Channel() <-chan string {
|
||||
channelSize := 100 // same as redis defaults
|
||||
ch := make(chan string, channelSize)
|
||||
|
||||
go func() {
|
||||
for msg := range c.pubsub.Channel(redis.WithChannelSize(channelSize)) {
|
||||
ch <- msg.Payload
|
||||
}
|
||||
close(ch)
|
||||
}()
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
func (c *cancellationSubscription) Close() error {
|
||||
return c.pubsub.Close()
|
||||
}
|
||||
|
||||
// SubscribeCancellation returns a subscription for cancelation messages.
|
||||
func (r *RDB) SubscribeCancellation() (base.CancellationSubscription, error) {
|
||||
var op errors.Op = "rdb.CancelationPubSub"
|
||||
ctx := context.Background()
|
||||
pubsub := r.client.Subscribe(ctx, base.CancelChannel)
|
||||
@@ -1490,7 +1515,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||
}
|
||||
return pubsub, nil
|
||||
return &cancellationSubscription{pubsub: pubsub}, nil
|
||||
}
|
||||
|
||||
// PublishCancelation publish cancelation message to all subscribers.
|
||||
|
@@ -3236,12 +3236,12 @@ func TestCancelationPubSub(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
pubsub, err := r.CancelationPubSub()
|
||||
sub, err := r.SubscribeCancellation()
|
||||
if err != nil {
|
||||
t.Fatalf("(*RDB).CancelationPubSub() returned an error: %v", err)
|
||||
}
|
||||
|
||||
cancelCh := pubsub.Channel()
|
||||
cancelCh := sub.Channel()
|
||||
|
||||
var (
|
||||
mu sync.Mutex
|
||||
@@ -3249,9 +3249,9 @@ func TestCancelationPubSub(t *testing.T) {
|
||||
)
|
||||
|
||||
go func() {
|
||||
for msg := range cancelCh {
|
||||
for id := range cancelCh {
|
||||
mu.Lock()
|
||||
received = append(received, msg.Payload)
|
||||
received = append(received, id)
|
||||
mu.Unlock()
|
||||
}
|
||||
}()
|
||||
@@ -3265,7 +3265,7 @@ func TestCancelationPubSub(t *testing.T) {
|
||||
// allow for message to reach subscribers.
|
||||
time.Sleep(time.Second)
|
||||
|
||||
pubsub.Close()
|
||||
sub.Close()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(publish, received, h.SortStringSliceOpt); diff != "" {
|
||||
|
@@ -12,7 +12,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var errRedisDown = errors.New("testutil: redis is down")
|
||||
@@ -190,13 +189,13 @@ func (tb *TestBroker) ClearServerState(host string, pid int, serverID string) er
|
||||
return tb.real.ClearServerState(host, pid, serverID)
|
||||
}
|
||||
|
||||
func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
|
||||
func (tb *TestBroker) SubscribeCancellation() (base.CancellationSubscription, error) {
|
||||
tb.mu.Lock()
|
||||
defer tb.mu.Unlock()
|
||||
if tb.sleeping {
|
||||
return nil, errRedisDown
|
||||
}
|
||||
return tb.real.CancelationPubSub()
|
||||
return tb.real.SubscribeCancellation()
|
||||
}
|
||||
|
||||
func (tb *TestBroker) PublishCancelation(id string) error {
|
||||
|
Reference in New Issue
Block a user