From f8697fe26453f7d923abde4bfeac7d60cae663ec Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Tue, 23 Feb 2016 11:29:57 -0500 Subject: [PATCH] Tweak shutdown behaviour (again) The previous behaviour was unsound, as it was prone to dropping connections under (temporary) high load. The new behaviour requires that users are well-behaved with respect to shutdown - the root listener must be shut down before any of the child listeners are, otherwise deadlocks may occur. This requirement seems reasonable. --- bench_test.go | 3 +-- cmux.go | 25 ++++++++++--------------- cmux_test.go | 42 +++++++++++++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/bench_test.go b/bench_test.go index 2351cd0..a2171d1 100644 --- a/bench_test.go +++ b/bench_test.go @@ -32,7 +32,6 @@ func BenchmarkCMuxConn(b *testing.B) { } }() - donec := make(chan struct{}) var wg sync.WaitGroup wg.Add(b.N) @@ -41,6 +40,6 @@ func BenchmarkCMuxConn(b *testing.B) { c := &mockConn{ r: bytes.NewReader(benchHTTPPayload), } - m.serve(c, donec, &wg) + m.serve(c, &wg) } } diff --git a/cmux.go b/cmux.go index 10a8932..769cb26 100644 --- a/cmux.go +++ b/cmux.go @@ -45,11 +45,16 @@ var ErrListenerClosed = errListenerClosed("mux: listener closed") // New instantiates a new connection multiplexer. func New(l net.Listener) CMux { + return NewSize(l, 1024) +} + +// NewSize instantiates a new connection multiplexer which buffers up to size +// connections in its child listeners. +func NewSize(l net.Listener, size int) CMux { return &cMux{ root: l, - bufLen: 1024, + bufLen: size, errh: func(_ error) bool { return true }, - donec: make(chan struct{}), } } @@ -76,7 +81,6 @@ type cMux struct { root net.Listener bufLen int errh ErrorHandler - donec chan struct{} sls []matchersListener } @@ -93,7 +97,6 @@ func (m *cMux) Serve() error { var wg sync.WaitGroup defer func() { - close(m.donec) wg.Wait() for _, sl := range m.sls { @@ -111,11 +114,11 @@ func (m *cMux) Serve() error { } wg.Add(1) - go m.serve(c, m.donec, &wg) + go m.serve(c, &wg) } } -func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { +func (m *cMux) serve(c net.Conn, wg *sync.WaitGroup) { defer wg.Done() muc := newMuxConn(c) @@ -124,15 +127,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) { matched := s(muc.sniffer()) muc.reset() if matched { - select { - case sl.l.connc <- muc: - default: - select { - case <-donec: - _ = c.Close() - default: - } - } + sl.l.connc <- muc return } } diff --git a/cmux_test.go b/cmux_test.go index d7ddca4..e4bc8d0 100644 --- a/cmux_test.go +++ b/cmux_test.go @@ -354,27 +354,47 @@ func TestClose(t *testing.T) { }() l := newChanListener() - c1, c2 := net.Pipe() - - muxl := New(l) + muxl := NewSize(l, 0) anyl := muxl.Match(Any()) + defer func() { + // Listener is closed. + close(l.connCh) + + // Root listener is closed now. + if _, err := anyl.Accept(); err != ErrListenerClosed { + t.Fatal(err) + } + }() + go safeServe(errCh, muxl) + c1, c2 := net.Pipe() + l.connCh <- c1 - // First connection goes through. + // Simulate the child listener being temporarily blocked. The connection + // multiplexer must be unbuffered (size=0). Before the fix, this would cause + // connections to be lost. + time.Sleep(50 * time.Millisecond) + + go func() { + // Simulate the child listener being temporarily blocked. The connection + // multiplexer must be unbuffered (size=0). Before the fix, this would cause + // connections to be lost. + time.Sleep(50 * time.Millisecond) + + l.connCh <- c2 + }() + + // First connection goes through. Before the fix, c1 would be lost while we + // were temporarily blocked, so this consumed c2. if _, err := anyl.Accept(); err != nil { t.Fatal(err) } - // Second connection is sent - l.connCh <- c2 - - // Listener is closed. - close(l.connCh) - - // Second connection goes through. + // Second connection goes through. Before the fix, c2 would be consumed above, so + // this would block indefinitely. if _, err := anyl.Accept(); err != nil { t.Fatal(err) }