diff --git a/subscriber.go b/subscriber.go index d401013..4b47993 100644 --- a/subscriber.go +++ b/subscriber.go @@ -6,7 +6,9 @@ package asynq import ( "sync" + "time" + "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -38,15 +40,29 @@ func (s *subscriber) terminate() { } func (s *subscriber) start(wg *sync.WaitGroup) { - pubsub, err := s.rdb.CancelationPubSub() - cancelCh := pubsub.Channel() - if err != nil { - s.logger.Error("cannot subscribe to cancelation channel: %v", err) - return - } wg.Add(1) go func() { defer wg.Done() + var ( + pubsub *redis.PubSub + err error + ) + // Try until successfully connect to Redis. + for { + pubsub, err = s.rdb.CancelationPubSub() + if err != nil { + s.logger.Error("cannot subscribe to cancelation channel: %v", err) + select { + case <-time.After(5 * time.Second): // retry in 5s + continue + case <-s.done: + s.logger.Info("Subscriber done") + return + } + } + break + } + cancelCh := pubsub.Channel() for { select { case <-s.done: diff --git a/subscriber_test.go b/subscriber_test.go index ba8cd5e..bc315c6 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -41,12 +41,15 @@ func TestSubscriber(t *testing.T) { var wg sync.WaitGroup subscriber.start(&wg) + // wait for subscriber to establish connection to pubsub channel + time.Sleep(time.Second) + if err := rdbClient.PublishCancelation(tc.publishID); err != nil { subscriber.terminate() t.Fatalf("could not publish cancelation message: %v", err) } - // allow for redis to publish message + // wait for redis to publish message time.Sleep(time.Second) mu.Lock()