diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go new file mode 100644 index 0000000..d06cd6a --- /dev/null +++ b/internal/testbroker/testbroker.go @@ -0,0 +1,50 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +// Package testbroker exports a broker implementation that should be used in package testing. +package testbroker + +import ( + "errors" + "sync" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" +) + +var errRedisDown = errors.New("asynqtest: redis is down") + +// TestBroker is a broker implementation which enables +// to simulate Redis failure in tests. +type TestBroker struct { + mu sync.Mutex + sleeping bool + + *rdb.RDB +} + +func NewTestBroker(r *rdb.RDB) *TestBroker { + return &TestBroker{RDB: r} +} + +func (tb *TestBroker) Sleep() { + tb.mu.Lock() + defer tb.mu.Unlock() + tb.sleeping = true +} + +func (tb *TestBroker) Wakeup() { + tb.mu.Lock() + defer tb.mu.Unlock() + tb.sleeping = false +} + +func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return nil, errRedisDown + } + return tb.RDB.CancelationPubSub() +} diff --git a/subscriber.go b/subscriber.go index f6fa7f6..f21bae5 100644 --- a/subscriber.go +++ b/subscriber.go @@ -21,6 +21,9 @@ type subscriber struct { // cancelations hold cancel functions for all in-progress tasks. cancelations *base.Cancelations + + // time to wait before retrying to connect to redis. + retryTimeout time.Duration } func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { @@ -29,6 +32,7 @@ func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscri broker: b, done: make(chan struct{}), cancelations: cancelations, + retryTimeout: 5 * time.Second, } } @@ -52,7 +56,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { if err != nil { s.logger.Error("cannot subscribe to cancelation channel: %v", err) select { - case <-time.After(5 * time.Second): // retry in 5s + case <-time.After(s.retryTimeout): continue case <-s.done: s.logger.Info("Subscriber done") diff --git a/subscriber_test.go b/subscriber_test.go index bc315c6..cc24986 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -11,6 +11,7 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/testbroker" ) func TestSubscriber(t *testing.T) { @@ -40,12 +41,12 @@ func TestSubscriber(t *testing.T) { subscriber := newSubscriber(testLogger, rdbClient, cancelations) var wg sync.WaitGroup subscriber.start(&wg) + defer subscriber.terminate() // wait for subscriber to establish connection to pubsub channel time.Sleep(time.Second) if err := rdbClient.PublishCancelation(tc.publishID); err != nil { - subscriber.terminate() t.Fatalf("could not publish cancelation message: %v", err) } @@ -61,7 +62,53 @@ func TestSubscriber(t *testing.T) { } } mu.Unlock() - - subscriber.terminate() } } + +func TestSubscriberWithRedisDown(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Errorf("panic occurred: %v", r) + } + }() + r := rdb.NewRDB(setup(t)) + testBroker := testbroker.NewTestBroker(r) + + cancelations := base.NewCancelations() + subscriber := newSubscriber(testLogger, testBroker, cancelations) + subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose. + + testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. + var wg sync.WaitGroup + subscriber.start(&wg) + defer subscriber.terminate() + + time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis. + + testBroker.Wakeup() // simulate a situation where redis server is back online. + + time.Sleep(2 * time.Second) // allow subscriber to establish pubsub channel. + + const id = "test" + var ( + mu sync.Mutex + called bool + ) + cancelations.Add(id, func() { + mu.Lock() + defer mu.Unlock() + called = true + }) + + if err := r.PublishCancelation(id); err != nil { + t.Fatalf("could not publish cancelation message: %v", err) + } + + time.Sleep(time.Second) // wait for redis to publish message. + + mu.Lock() + if !called { + t.Errorf("cancel function was not called") + } + mu.Unlock() +}