2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add test to simulate situation where redis is down

This commit is contained in:
Ken Hibino 2020-04-17 07:52:12 -07:00
parent f8a94fb839
commit 46ab4417dd
3 changed files with 105 additions and 4 deletions

View File

@ -0,0 +1,50 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
// Package testbroker exports a broker implementation that should be used in package testing.
package testbroker
import (
"errors"
"sync"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
)
var errRedisDown = errors.New("asynqtest: redis is down")
// TestBroker is a broker implementation which enables
// to simulate Redis failure in tests.
type TestBroker struct {
mu sync.Mutex
sleeping bool
*rdb.RDB
}
func NewTestBroker(r *rdb.RDB) *TestBroker {
return &TestBroker{RDB: r}
}
func (tb *TestBroker) Sleep() {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.sleeping = true
}
func (tb *TestBroker) Wakeup() {
tb.mu.Lock()
defer tb.mu.Unlock()
tb.sleeping = false
}
func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
}
return tb.RDB.CancelationPubSub()
}

View File

@ -21,6 +21,9 @@ type subscriber struct {
// cancelations hold cancel functions for all in-progress tasks. // cancelations hold cancel functions for all in-progress tasks.
cancelations *base.Cancelations cancelations *base.Cancelations
// time to wait before retrying to connect to redis.
retryTimeout time.Duration
} }
func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber {
@ -29,6 +32,7 @@ func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscri
broker: b, broker: b,
done: make(chan struct{}), done: make(chan struct{}),
cancelations: cancelations, cancelations: cancelations,
retryTimeout: 5 * time.Second,
} }
} }
@ -52,7 +56,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
if err != nil { if err != nil {
s.logger.Error("cannot subscribe to cancelation channel: %v", err) s.logger.Error("cannot subscribe to cancelation channel: %v", err)
select { select {
case <-time.After(5 * time.Second): // retry in 5s case <-time.After(s.retryTimeout):
continue continue
case <-s.done: case <-s.done:
s.logger.Info("Subscriber done") s.logger.Info("Subscriber done")

View File

@ -11,6 +11,7 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
) )
func TestSubscriber(t *testing.T) { func TestSubscriber(t *testing.T) {
@ -40,12 +41,12 @@ func TestSubscriber(t *testing.T) {
subscriber := newSubscriber(testLogger, rdbClient, cancelations) subscriber := newSubscriber(testLogger, rdbClient, cancelations)
var wg sync.WaitGroup var wg sync.WaitGroup
subscriber.start(&wg) subscriber.start(&wg)
defer subscriber.terminate()
// wait for subscriber to establish connection to pubsub channel // wait for subscriber to establish connection to pubsub channel
time.Sleep(time.Second) time.Sleep(time.Second)
if err := rdbClient.PublishCancelation(tc.publishID); err != nil { if err := rdbClient.PublishCancelation(tc.publishID); err != nil {
subscriber.terminate()
t.Fatalf("could not publish cancelation message: %v", err) t.Fatalf("could not publish cancelation message: %v", err)
} }
@ -61,7 +62,53 @@ func TestSubscriber(t *testing.T) {
} }
} }
mu.Unlock() mu.Unlock()
subscriber.terminate()
} }
} }
func TestSubscriberWithRedisDown(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
cancelations := base.NewCancelations()
subscriber := newSubscriber(testLogger, testBroker, cancelations)
subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose.
testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis.
var wg sync.WaitGroup
subscriber.start(&wg)
defer subscriber.terminate()
time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis.
testBroker.Wakeup() // simulate a situation where redis server is back online.
time.Sleep(2 * time.Second) // allow subscriber to establish pubsub channel.
const id = "test"
var (
mu sync.Mutex
called bool
)
cancelations.Add(id, func() {
mu.Lock()
defer mu.Unlock()
called = true
})
if err := r.PublishCancelation(id); err != nil {
t.Fatalf("could not publish cancelation message: %v", err)
}
time.Sleep(time.Second) // wait for redis to publish message.
mu.Lock()
if !called {
t.Errorf("cancel function was not called")
}
mu.Unlock()
}