From ff7a91405b0915d65846af3cb8c67e22e9fa41fa Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Fri, 11 Dec 2015 14:33:39 -0500 Subject: [PATCH] Closing the root listener closes child listeners Partially reverts b90740dfa9b286c06f58fa798929e2d2cb1299e5; the same protection is now afforded by an RWMutex which protects the channels while they are being served on. --- bench_test.go | 9 +++++---- cmux.go | 37 ++++++++++++++++++------------------- cmux_test.go | 16 +++++++--------- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/bench_test.go b/bench_test.go index d5c24d9..73b001d 100644 --- a/bench_test.go +++ b/bench_test.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "net" + "sync" "testing" ) @@ -17,8 +18,6 @@ func (c *mockConn) Read(b []byte) (n int, err error) { } func BenchmarkCMuxConn(b *testing.B) { - b.StopTimer() - benchHTTPPayload := make([]byte, 4096) copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1")) @@ -33,12 +32,14 @@ func BenchmarkCMuxConn(b *testing.B) { } }() - b.StartTimer() + var mu sync.RWMutex + + b.ResetTimer() for i := 0; i < b.N; i++ { c := &mockConn{ r: bytes.NewReader(benchHTTPPayload), } - m.serve(c) + m.serve(c, &mu) } } diff --git a/cmux.go b/cmux.go index 76a5bae..3cc883e 100644 --- a/cmux.go +++ b/cmux.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "sync" ) // Matcher matches a connection based on its content. @@ -81,16 +82,20 @@ func (m *cMux) Match(matchers ...Matcher) (l net.Listener) { ml := muxListener{ Listener: m.root, connc: make(chan net.Conn, m.bufLen), - donec: make(chan struct{}), } m.sls = append(m.sls, matchersListener{ss: matchers, l: ml}) return ml } func (m *cMux) Serve() error { + var mu sync.RWMutex + defer func() { + mu.Lock() + defer mu.Unlock() + for _, sl := range m.sls { - close(sl.l.donec) + close(sl.l.connc) } }() @@ -103,35 +108,30 @@ func (m *cMux) Serve() error { continue } - go m.serve(c) + go m.serve(c, &mu) } } -func (m *cMux) serve(c net.Conn) { +func (m *cMux) serve(c net.Conn, mu *sync.RWMutex) { + mu.RLock() + defer mu.RUnlock() + muc := newMuxConn(c) - matched := false for _, sl := range m.sls { for _, s := range sl.ss { - matched = s(muc.sniffer()) + matched := s(muc.sniffer()) muc.reset() if matched { - select { - // TODO(soheil): threre is a possiblity of having unclosed connection. - case sl.l.connc <- muc: - case <-sl.l.donec: - c.Close() - } + sl.l.connc <- muc return } } } - if !matched { - c.Close() - err := ErrNotMatched{c: c} - if !m.handleErr(err) { - m.root.Close() - } + c.Close() + err := ErrNotMatched{c: c} + if !m.handleErr(err) { + m.root.Close() } } @@ -154,7 +154,6 @@ func (m *cMux) handleErr(err error) bool { type muxListener struct { net.Listener connc chan net.Conn - donec chan struct{} } func (l muxListener) Accept() (c net.Conn, err error) { diff --git a/cmux_test.go b/cmux_test.go index ad40be8..4a99694 100644 --- a/cmux_test.go +++ b/cmux_test.go @@ -169,15 +169,13 @@ func TestErrorHandler(t *testing.T) { } } -type closerConn struct { - net.Conn -} +func TestClose(t *testing.T) { + l, _ := testListener(t) -func (c closerConn) Close() error { return nil } + muxl := New(l) + anyl := muxl.Match(Any()) -func TestClosed(t *testing.T) { - mux := &cMux{} - lis := mux.Match(Any()).(muxListener) - close(lis.donec) - mux.serve(closerConn{}) + l.Close() + muxl.Serve() + anyl.Accept() }