mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 23:06:12 +08:00 
			
		
		
		
	Add test to simulate situation where redis is down
This commit is contained in:
		
							
								
								
									
										50
									
								
								internal/testbroker/testbroker.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										50
									
								
								internal/testbroker/testbroker.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,50 @@ | ||||
| // Copyright 2020 Kentaro Hibino. All rights reserved. | ||||
| // Use of this source code is governed by a MIT license | ||||
| // that can be found in the LICENSE file. | ||||
|  | ||||
| // Package testbroker exports a broker implementation that should be used in package testing. | ||||
| package testbroker | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"sync" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| ) | ||||
|  | ||||
| var errRedisDown = errors.New("asynqtest: redis is down") | ||||
|  | ||||
| // TestBroker is a broker implementation which enables | ||||
| // to simulate Redis failure in tests. | ||||
| type TestBroker struct { | ||||
| 	mu       sync.Mutex | ||||
| 	sleeping bool | ||||
|  | ||||
| 	*rdb.RDB | ||||
| } | ||||
|  | ||||
| func NewTestBroker(r *rdb.RDB) *TestBroker { | ||||
| 	return &TestBroker{RDB: r} | ||||
| } | ||||
|  | ||||
| func (tb *TestBroker) Sleep() { | ||||
| 	tb.mu.Lock() | ||||
| 	defer tb.mu.Unlock() | ||||
| 	tb.sleeping = true | ||||
| } | ||||
|  | ||||
| func (tb *TestBroker) Wakeup() { | ||||
| 	tb.mu.Lock() | ||||
| 	defer tb.mu.Unlock() | ||||
| 	tb.sleeping = false | ||||
| } | ||||
|  | ||||
| func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { | ||||
| 	tb.mu.Lock() | ||||
| 	defer tb.mu.Unlock() | ||||
| 	if tb.sleeping { | ||||
| 		return nil, errRedisDown | ||||
| 	} | ||||
| 	return tb.RDB.CancelationPubSub() | ||||
| } | ||||
| @@ -21,6 +21,9 @@ type subscriber struct { | ||||
|  | ||||
| 	// cancelations hold cancel functions for all in-progress tasks. | ||||
| 	cancelations *base.Cancelations | ||||
|  | ||||
| 	// time to wait before retrying to connect to redis. | ||||
| 	retryTimeout time.Duration | ||||
| } | ||||
|  | ||||
| func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { | ||||
| @@ -29,6 +32,7 @@ func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscri | ||||
| 		broker:       b, | ||||
| 		done:         make(chan struct{}), | ||||
| 		cancelations: cancelations, | ||||
| 		retryTimeout: 5 * time.Second, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -52,7 +56,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { | ||||
| 			if err != nil { | ||||
| 				s.logger.Error("cannot subscribe to cancelation channel: %v", err) | ||||
| 				select { | ||||
| 				case <-time.After(5 * time.Second): // retry in 5s | ||||
| 				case <-time.After(s.retryTimeout): | ||||
| 					continue | ||||
| 				case <-s.done: | ||||
| 					s.logger.Info("Subscriber done") | ||||
|   | ||||
| @@ -11,6 +11,7 @@ import ( | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| 	"github.com/hibiken/asynq/internal/testbroker" | ||||
| ) | ||||
|  | ||||
| func TestSubscriber(t *testing.T) { | ||||
| @@ -40,12 +41,12 @@ func TestSubscriber(t *testing.T) { | ||||
| 		subscriber := newSubscriber(testLogger, rdbClient, cancelations) | ||||
| 		var wg sync.WaitGroup | ||||
| 		subscriber.start(&wg) | ||||
| 		defer subscriber.terminate() | ||||
|  | ||||
| 		// wait for subscriber to establish connection to pubsub channel | ||||
| 		time.Sleep(time.Second) | ||||
|  | ||||
| 		if err := rdbClient.PublishCancelation(tc.publishID); err != nil { | ||||
| 			subscriber.terminate() | ||||
| 			t.Fatalf("could not publish cancelation message: %v", err) | ||||
| 		} | ||||
|  | ||||
| @@ -61,7 +62,53 @@ func TestSubscriber(t *testing.T) { | ||||
| 			} | ||||
| 		} | ||||
| 		mu.Unlock() | ||||
|  | ||||
| 		subscriber.terminate() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestSubscriberWithRedisDown(t *testing.T) { | ||||
| 	defer func() { | ||||
| 		if r := recover(); r != nil { | ||||
| 			t.Errorf("panic occurred: %v", r) | ||||
| 		} | ||||
| 	}() | ||||
| 	r := rdb.NewRDB(setup(t)) | ||||
| 	testBroker := testbroker.NewTestBroker(r) | ||||
|  | ||||
| 	cancelations := base.NewCancelations() | ||||
| 	subscriber := newSubscriber(testLogger, testBroker, cancelations) | ||||
| 	subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose. | ||||
|  | ||||
| 	testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. | ||||
| 	var wg sync.WaitGroup | ||||
| 	subscriber.start(&wg) | ||||
| 	defer subscriber.terminate() | ||||
|  | ||||
| 	time.Sleep(2 * time.Second) // subscriber should wait and retry connecting to redis. | ||||
|  | ||||
| 	testBroker.Wakeup() // simulate a situation where redis server is back online. | ||||
|  | ||||
| 	time.Sleep(2 * time.Second) // allow subscriber to establish pubsub channel. | ||||
|  | ||||
| 	const id = "test" | ||||
| 	var ( | ||||
| 		mu     sync.Mutex | ||||
| 		called bool | ||||
| 	) | ||||
| 	cancelations.Add(id, func() { | ||||
| 		mu.Lock() | ||||
| 		defer mu.Unlock() | ||||
| 		called = true | ||||
| 	}) | ||||
|  | ||||
| 	if err := r.PublishCancelation(id); err != nil { | ||||
| 		t.Fatalf("could not publish cancelation message: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	time.Sleep(time.Second) // wait for redis to publish message. | ||||
|  | ||||
| 	mu.Lock() | ||||
| 	if !called { | ||||
| 		t.Errorf("cancel function was not called") | ||||
| 	} | ||||
| 	mu.Unlock() | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user