mirror of
https://github.com/soheilhy/cmux.git
synced 2024-11-10 11:41:52 +08:00
Merge pull request #8 from tamird/fix-leaks
fix test-only goroutine leaks
This commit is contained in:
commit
4796322bac
202
cmux_test.go
202
cmux_test.go
@ -6,7 +6,12 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/rpc"
|
"net/rpc"
|
||||||
|
"runtime"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -14,6 +19,24 @@ const (
|
|||||||
rpcVal = 1234
|
rpcVal = 1234
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func safeServe(errCh chan<- error, muxl CMux) {
|
||||||
|
if err := muxl.Serve(); !strings.Contains(err.Error(), "use of closed network connection") {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func safeDial(t *testing.T, addr net.Addr) (*rpc.Client, func()) {
|
||||||
|
c, err := rpc.Dial(addr.Network(), addr.String())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return c, func() {
|
||||||
|
if err := c.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testListener(t *testing.T) (net.Listener, func()) {
|
func testListener(t *testing.T) (net.Listener, func()) {
|
||||||
l, err := net.Listen("tcp", ":0")
|
l, err := net.Listen("tcp", ":0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -21,7 +44,7 @@ func testListener(t *testing.T) (net.Listener, func()) {
|
|||||||
}
|
}
|
||||||
return l, func() {
|
return l, func() {
|
||||||
if err := l.Close(); err != nil {
|
if err := l.Close(); err != nil {
|
||||||
t.Error(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -32,12 +55,35 @@ func (h *testHTTP1Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
fmt.Fprintf(w, testHTTP1Resp)
|
fmt.Fprintf(w, testHTTP1Resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTestHTTPServer(t *testing.T, l net.Listener) {
|
func runTestHTTPServer(errCh chan<- error, l net.Listener) {
|
||||||
|
var mu sync.Mutex
|
||||||
|
conns := make(map[net.Conn]struct{})
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
mu.Lock()
|
||||||
|
for c := range conns {
|
||||||
|
if err := c.Close(); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
s := &http.Server{
|
s := &http.Server{
|
||||||
Handler: &testHTTP1Handler{},
|
Handler: &testHTTP1Handler{},
|
||||||
|
ConnState: func(c net.Conn, state http.ConnState) {
|
||||||
|
mu.Lock()
|
||||||
|
switch state {
|
||||||
|
case http.StateNew:
|
||||||
|
conns[c] = struct{}{}
|
||||||
|
case http.StateClosed:
|
||||||
|
delete(conns, c)
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if err := s.Serve(l); err != nil && err != ErrListenerClosed {
|
if err := s.Serve(l); err != ErrListenerClosed {
|
||||||
t.Log(err)
|
errCh <- err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -49,12 +95,12 @@ func runTestHTTP1Client(t *testing.T, addr net.Addr) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := r.Body.Close(); err != nil {
|
if err := r.Body.Close(); err != nil {
|
||||||
t.Log(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
b, err := ioutil.ReadAll(r.Body)
|
b, err := ioutil.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if string(b) != testHTTP1Resp {
|
if string(b) != testHTTP1Resp {
|
||||||
@ -69,15 +115,17 @@ func (r TestRPCRcvr) Test(i int, j *int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func runTestRPCServer(t *testing.T, l net.Listener) {
|
func runTestRPCServer(errCh chan<- error, l net.Listener) {
|
||||||
s := rpc.NewServer()
|
s := rpc.NewServer()
|
||||||
if err := s.Register(TestRPCRcvr{}); err != nil {
|
if err := s.Register(TestRPCRcvr{}); err != nil {
|
||||||
t.Fatal(err)
|
errCh <- err
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
c, err := l.Accept()
|
c, err := l.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Log(err)
|
if err != ErrListenerClosed {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go s.ServeConn(c)
|
go s.ServeConn(c)
|
||||||
@ -85,16 +133,12 @@ func runTestRPCServer(t *testing.T, l net.Listener) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runTestRPCClient(t *testing.T, addr net.Addr) {
|
func runTestRPCClient(t *testing.T, addr net.Addr) {
|
||||||
c, err := rpc.Dial(addr.Network(), addr.String())
|
c, cleanup := safeDial(t, addr)
|
||||||
if err != nil {
|
defer cleanup()
|
||||||
t.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var num int
|
var num int
|
||||||
if err := c.Call("TestRPCRcvr.Test", rpcVal, &num); err != nil {
|
if err := c.Call("TestRPCRcvr.Test", rpcVal, &num); err != nil {
|
||||||
t.Error(err)
|
t.Fatal(err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if num != rpcVal {
|
if num != rpcVal {
|
||||||
@ -103,23 +147,37 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAny(t *testing.T) {
|
func TestAny(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
errCh := make(chan error)
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
l, cleanup := testListener(t)
|
l, cleanup := testListener(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
muxl := New(l)
|
muxl := New(l)
|
||||||
httpl := muxl.Match(Any())
|
httpl := muxl.Match(Any())
|
||||||
|
|
||||||
go runTestHTTPServer(t, httpl)
|
go runTestHTTPServer(errCh, httpl)
|
||||||
go func() {
|
go safeServe(errCh, muxl)
|
||||||
if err := muxl.Serve(); err != nil {
|
|
||||||
t.Log(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
runTestHTTP1Client(t, l.Addr())
|
runTestHTTP1Client(t, l.Addr())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHTTPGoRPC(t *testing.T) {
|
func TestHTTPGoRPC(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
errCh := make(chan error)
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
l, cleanup := testListener(t)
|
l, cleanup := testListener(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
@ -127,31 +185,32 @@ func TestHTTPGoRPC(t *testing.T) {
|
|||||||
httpl := muxl.Match(HTTP2(), HTTP1Fast())
|
httpl := muxl.Match(HTTP2(), HTTP1Fast())
|
||||||
rpcl := muxl.Match(Any())
|
rpcl := muxl.Match(Any())
|
||||||
|
|
||||||
go runTestHTTPServer(t, httpl)
|
go runTestHTTPServer(errCh, httpl)
|
||||||
go runTestRPCServer(t, rpcl)
|
go runTestRPCServer(errCh, rpcl)
|
||||||
go func() {
|
go safeServe(errCh, muxl)
|
||||||
if err := muxl.Serve(); err != nil {
|
|
||||||
t.Log(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
runTestHTTP1Client(t, l.Addr())
|
runTestHTTP1Client(t, l.Addr())
|
||||||
runTestRPCClient(t, l.Addr())
|
runTestRPCClient(t, l.Addr())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestErrorHandler(t *testing.T) {
|
func TestErrorHandler(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
errCh := make(chan error)
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
l, cleanup := testListener(t)
|
l, cleanup := testListener(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
|
||||||
muxl := New(l)
|
muxl := New(l)
|
||||||
httpl := muxl.Match(HTTP2(), HTTP1Fast())
|
httpl := muxl.Match(HTTP2(), HTTP1Fast())
|
||||||
|
|
||||||
go runTestHTTPServer(t, httpl)
|
go runTestHTTPServer(errCh, httpl)
|
||||||
go func() {
|
go safeServe(errCh, muxl)
|
||||||
if err := muxl.Serve(); err != nil {
|
|
||||||
t.Log(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
firstErr := true
|
firstErr := true
|
||||||
muxl.HandleError(func(err error) bool {
|
muxl.HandleError(func(err error) bool {
|
||||||
@ -165,11 +224,8 @@ func TestErrorHandler(t *testing.T) {
|
|||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
addr := l.Addr()
|
c, cleanup := safeDial(t, l.Addr())
|
||||||
c, err := rpc.Dial(addr.Network(), addr.String())
|
defer cleanup()
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var num int
|
var num int
|
||||||
if err := c.Call("TestRPCRcvr.Test", rpcVal, &num); err == nil {
|
if err := c.Call("TestRPCRcvr.Test", rpcVal, &num); err == nil {
|
||||||
@ -184,6 +240,7 @@ type closerConn struct {
|
|||||||
func (c closerConn) Close() error { return nil }
|
func (c closerConn) Close() error { return nil }
|
||||||
|
|
||||||
func TestClosed(t *testing.T) {
|
func TestClosed(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
mux := &cMux{}
|
mux := &cMux{}
|
||||||
lis := mux.Match(Any()).(muxListener)
|
lis := mux.Match(Any()).(muxListener)
|
||||||
close(lis.donec)
|
close(lis.donec)
|
||||||
@ -193,3 +250,68 @@ func TestClosed(t *testing.T) {
|
|||||||
t.Errorf("expected errListenerClosed got %v", err)
|
t.Errorf("expected errListenerClosed got %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cribbed from google.golang.org/grpc/test/end2end_test.go.
|
||||||
|
|
||||||
|
// interestingGoroutines returns all goroutines we care about for the purpose
|
||||||
|
// of leak checking. It excludes testing or runtime ones.
|
||||||
|
func interestingGoroutines() (gs []string) {
|
||||||
|
buf := make([]byte, 2<<20)
|
||||||
|
buf = buf[:runtime.Stack(buf, true)]
|
||||||
|
for _, g := range strings.Split(string(buf), "\n\n") {
|
||||||
|
sl := strings.SplitN(g, "\n", 2)
|
||||||
|
if len(sl) != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stack := strings.TrimSpace(sl[1])
|
||||||
|
if strings.HasPrefix(stack, "testing.RunTests") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if stack == "" ||
|
||||||
|
strings.Contains(stack, "testing.Main(") ||
|
||||||
|
strings.Contains(stack, "runtime.goexit") ||
|
||||||
|
strings.Contains(stack, "created by runtime.gc") ||
|
||||||
|
strings.Contains(stack, "interestingGoroutines") ||
|
||||||
|
strings.Contains(stack, "runtime.MHeap_Scavenger") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
gs = append(gs, g)
|
||||||
|
}
|
||||||
|
sort.Strings(gs)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// leakCheck snapshots the currently-running goroutines and returns a
|
||||||
|
// function to be run at the end of tests to see whether any
|
||||||
|
// goroutines leaked.
|
||||||
|
func leakCheck(t testing.TB) func() {
|
||||||
|
orig := map[string]bool{}
|
||||||
|
for _, g := range interestingGoroutines() {
|
||||||
|
orig[g] = true
|
||||||
|
}
|
||||||
|
return func() {
|
||||||
|
// Loop, waiting for goroutines to shut down.
|
||||||
|
// Wait up to 5 seconds, but finish as quickly as possible.
|
||||||
|
deadline := time.Now().Add(5 * time.Second)
|
||||||
|
for {
|
||||||
|
var leaked []string
|
||||||
|
for _, g := range interestingGoroutines() {
|
||||||
|
if !orig[g] {
|
||||||
|
leaked = append(leaked, g)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(leaked) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if time.Now().Before(deadline) {
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, g := range leaked {
|
||||||
|
t.Errorf("Leaked goroutine: %v", g)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user