2
0
mirror of https://github.com/soheilhy/cmux.git synced 2024-11-14 11:31:28 +08:00

Tweak shutdown behaviour (again)

The previous behaviour was unsound, as it was prone to dropping
connections under (temporary) high load. The new behaviour requires that
users are well-behaved with respect to shutdown - the root listener
must be shut down before any of the child listeners are, otherwise
deadlocks may occur. This requirement seems reasonable.
This commit is contained in:
Tamir Duberstein 2016-02-23 11:29:57 -05:00
parent 2625710699
commit f8697fe264
3 changed files with 42 additions and 28 deletions

View File

@ -32,7 +32,6 @@ func BenchmarkCMuxConn(b *testing.B) {
} }
}() }()
donec := make(chan struct{})
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(b.N) wg.Add(b.N)
@ -41,6 +40,6 @@ func BenchmarkCMuxConn(b *testing.B) {
c := &mockConn{ c := &mockConn{
r: bytes.NewReader(benchHTTPPayload), r: bytes.NewReader(benchHTTPPayload),
} }
m.serve(c, donec, &wg) m.serve(c, &wg)
} }
} }

25
cmux.go
View File

@ -45,11 +45,16 @@ var ErrListenerClosed = errListenerClosed("mux: listener closed")
// New instantiates a new connection multiplexer. // New instantiates a new connection multiplexer.
func New(l net.Listener) CMux { func New(l net.Listener) CMux {
return NewSize(l, 1024)
}
// NewSize instantiates a new connection multiplexer which buffers up to size
// connections in its child listeners.
func NewSize(l net.Listener, size int) CMux {
return &cMux{ return &cMux{
root: l, root: l,
bufLen: 1024, bufLen: size,
errh: func(_ error) bool { return true }, errh: func(_ error) bool { return true },
donec: make(chan struct{}),
} }
} }
@ -76,7 +81,6 @@ type cMux struct {
root net.Listener root net.Listener
bufLen int bufLen int
errh ErrorHandler errh ErrorHandler
donec chan struct{}
sls []matchersListener sls []matchersListener
} }
@ -93,7 +97,6 @@ func (m *cMux) Serve() error {
var wg sync.WaitGroup var wg sync.WaitGroup
defer func() { defer func() {
close(m.donec)
wg.Wait() wg.Wait()
for _, sl := range m.sls { for _, sl := range m.sls {
@ -111,11 +114,11 @@ func (m *cMux) Serve() error {
} }
wg.Add(1) wg.Add(1)
go m.serve(c, m.donec, &wg) go m.serve(c, &wg)
} }
} }
func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { func (m *cMux) serve(c net.Conn, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
muc := newMuxConn(c) muc := newMuxConn(c)
@ -124,15 +127,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
matched := s(muc.sniffer()) matched := s(muc.sniffer())
muc.reset() muc.reset()
if matched { if matched {
select { sl.l.connc <- muc
case sl.l.connc <- muc:
default:
select {
case <-donec:
_ = c.Close()
default:
}
}
return return
} }
} }

View File

@ -354,27 +354,47 @@ func TestClose(t *testing.T) {
}() }()
l := newChanListener() l := newChanListener()
c1, c2 := net.Pipe() muxl := NewSize(l, 0)
muxl := New(l)
anyl := muxl.Match(Any()) anyl := muxl.Match(Any())
defer func() {
// Listener is closed.
close(l.connCh)
// Root listener is closed now.
if _, err := anyl.Accept(); err != ErrListenerClosed {
t.Fatal(err)
}
}()
go safeServe(errCh, muxl) go safeServe(errCh, muxl)
c1, c2 := net.Pipe()
l.connCh <- c1 l.connCh <- c1
// First connection goes through. // Simulate the child listener being temporarily blocked. The connection
// multiplexer must be unbuffered (size=0). Before the fix, this would cause
// connections to be lost.
time.Sleep(50 * time.Millisecond)
go func() {
// Simulate the child listener being temporarily blocked. The connection
// multiplexer must be unbuffered (size=0). Before the fix, this would cause
// connections to be lost.
time.Sleep(50 * time.Millisecond)
l.connCh <- c2
}()
// First connection goes through. Before the fix, c1 would be lost while we
// were temporarily blocked, so this consumed c2.
if _, err := anyl.Accept(); err != nil { if _, err := anyl.Accept(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// Second connection is sent // Second connection goes through. Before the fix, c2 would be consumed above, so
l.connCh <- c2 // this would block indefinitely.
// Listener is closed.
close(l.connCh)
// Second connection goes through.
if _, err := anyl.Accept(); err != nil { if _, err := anyl.Accept(); err != nil {
t.Fatal(err) t.Fatal(err)
} }