2
0
mirror of https://github.com/soheilhy/cmux.git synced 2024-11-14 11:31:28 +08:00

Close the connections buffered for the listeners

This commit closes the connections that has been buffered
on the connection channel of the cmux listeners, when the
root listener is closed. With this change it is guaranteed
that the connections are either closed or handed of to the
child listeners (returned via Accept()).

There are a couple of changes to the tests to cover corner
cases and the new behavior.
This commit is contained in:
Soheil Hassas Yeganeh 2016-02-23 22:48:45 -05:00
parent 2625710699
commit 6f986603b0
2 changed files with 22 additions and 17 deletions

View File

@ -98,6 +98,10 @@ func (m *cMux) Serve() error {
for _, sl := range m.sls { for _, sl := range m.sls {
close(sl.l.connc) close(sl.l.connc)
// Drain the connections enqueued for the listener.
for c := range sl.l.connc {
_ = c.Close()
}
} }
}() }()
@ -126,12 +130,8 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
if matched { if matched {
select { select {
case sl.l.connc <- muc: case sl.l.connc <- muc:
default:
select {
case <-donec: case <-donec:
_ = c.Close() _ = c.Close()
default:
}
} }
return return
} }

View File

@ -262,10 +262,11 @@ func TestHTTP2(t *testing.T) {
}) })
h2l := muxl.Match(HTTP2()) h2l := muxl.Match(HTTP2())
go safeServe(errCh, muxl) go safeServe(errCh, muxl)
muxedConn, err := h2l.Accept()
close(l.connCh) close(l.connCh)
if muxedConn, err := h2l.Accept(); err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else { }
var b [len(http2.ClientPreface)]byte var b [len(http2.ClientPreface)]byte
if _, err := muxedConn.Read(b[:]); err != io.EOF { if _, err := muxedConn.Read(b[:]); err != io.EOF {
t.Fatal(err) t.Fatal(err)
@ -274,7 +275,6 @@ func TestHTTP2(t *testing.T) {
t.Errorf("got unexpected read %s, expected %s", b, http2.ClientPreface) t.Errorf("got unexpected read %s, expected %s", b, http2.ClientPreface)
} }
} }
}
func TestHTTPGoRPC(t *testing.T) { func TestHTTPGoRPC(t *testing.T) {
defer leakCheck(t)() defer leakCheck(t)()
@ -374,10 +374,15 @@ func TestClose(t *testing.T) {
// Listener is closed. // Listener is closed.
close(l.connCh) close(l.connCh)
// Second connection goes through. // Second connection either goes through or it is closed.
if _, err := anyl.Accept(); err != nil { if _, err := anyl.Accept(); err != nil {
if err != ErrListenerClosed {
t.Fatal(err) t.Fatal(err)
} }
if _, err := c2.Read([]byte{}); err != io.ErrClosedPipe {
t.Fatalf("connection is not closed and is leaked: %v", err)
}
}
} }
// Cribbed from google.golang.org/grpc/test/end2end_test.go. // Cribbed from google.golang.org/grpc/test/end2end_test.go.