2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-01-19 11:16:07 +08:00

Closing the root listener closes child listeners

Partially reverts b90740dfa9b286c06f58fa798929e2d2cb1299e5; the same
protection is now afforded by an RWMutex which protects the channels
while they are being served on.
This commit is contained in:
Tamir Duberstein 2015-12-11 14:33:39 -05:00
parent 44b568c84b
commit ff7a91405b
3 changed files with 30 additions and 32 deletions

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"io" "io"
"net" "net"
"sync"
"testing" "testing"
) )
@ -17,8 +18,6 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
} }
func BenchmarkCMuxConn(b *testing.B) { func BenchmarkCMuxConn(b *testing.B) {
b.StopTimer()
benchHTTPPayload := make([]byte, 4096) benchHTTPPayload := make([]byte, 4096)
copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1")) copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1"))
@ -33,12 +32,14 @@ func BenchmarkCMuxConn(b *testing.B) {
} }
}() }()
b.StartTimer() var mu sync.RWMutex
b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
c := &mockConn{ c := &mockConn{
r: bytes.NewReader(benchHTTPPayload), r: bytes.NewReader(benchHTTPPayload),
} }
m.serve(c) m.serve(c, &mu)
} }
} }

29
cmux.go
View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
) )
// Matcher matches a connection based on its content. // Matcher matches a connection based on its content.
@ -81,16 +82,20 @@ func (m *cMux) Match(matchers ...Matcher) (l net.Listener) {
ml := muxListener{ ml := muxListener{
Listener: m.root, Listener: m.root,
connc: make(chan net.Conn, m.bufLen), connc: make(chan net.Conn, m.bufLen),
donec: make(chan struct{}),
} }
m.sls = append(m.sls, matchersListener{ss: matchers, l: ml}) m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
return ml return ml
} }
func (m *cMux) Serve() error { func (m *cMux) Serve() error {
var mu sync.RWMutex
defer func() { defer func() {
mu.Lock()
defer mu.Unlock()
for _, sl := range m.sls { for _, sl := range m.sls {
close(sl.l.donec) close(sl.l.connc)
} }
}() }()
@ -103,36 +108,31 @@ func (m *cMux) Serve() error {
continue continue
} }
go m.serve(c) go m.serve(c, &mu)
} }
} }
func (m *cMux) serve(c net.Conn) { func (m *cMux) serve(c net.Conn, mu *sync.RWMutex) {
mu.RLock()
defer mu.RUnlock()
muc := newMuxConn(c) muc := newMuxConn(c)
matched := false
for _, sl := range m.sls { for _, sl := range m.sls {
for _, s := range sl.ss { for _, s := range sl.ss {
matched = s(muc.sniffer()) matched := s(muc.sniffer())
muc.reset() muc.reset()
if matched { if matched {
select { sl.l.connc <- muc
// TODO(soheil): threre is a possiblity of having unclosed connection.
case sl.l.connc <- muc:
case <-sl.l.donec:
c.Close()
}
return return
} }
} }
} }
if !matched {
c.Close() c.Close()
err := ErrNotMatched{c: c} err := ErrNotMatched{c: c}
if !m.handleErr(err) { if !m.handleErr(err) {
m.root.Close() m.root.Close()
} }
}
} }
func (m *cMux) HandleError(h ErrorHandler) { func (m *cMux) HandleError(h ErrorHandler) {
@ -154,7 +154,6 @@ func (m *cMux) handleErr(err error) bool {
type muxListener struct { type muxListener struct {
net.Listener net.Listener
connc chan net.Conn connc chan net.Conn
donec chan struct{}
} }
func (l muxListener) Accept() (c net.Conn, err error) { func (l muxListener) Accept() (c net.Conn, err error) {

View File

@ -169,15 +169,13 @@ func TestErrorHandler(t *testing.T) {
} }
} }
type closerConn struct { func TestClose(t *testing.T) {
net.Conn l, _ := testListener(t)
}
func (c closerConn) Close() error { return nil } muxl := New(l)
anyl := muxl.Match(Any())
func TestClosed(t *testing.T) { l.Close()
mux := &cMux{} muxl.Serve()
lis := mux.Match(Any()).(muxListener) anyl.Accept()
close(lis.donec)
mux.serve(closerConn{})
} }