2
0
mirror of https://github.com/soheilhy/cmux.git synced 2024-11-14 11:31:28 +08:00

Replace buffer with bufferedReader

bufferedReader is an optimized implementation of io.Reader that behaves
like
```
io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
```
without allocating.

This has a measurable effect on benchmarks:
```
name        old time/op    new time/op    delta
CMuxConn-4    1.09µs ± 4%    0.99µs ±19%   -9.32%  (p=0.000 n=17+19)

name        old alloc/op   new alloc/op   delta
CMuxConn-4      240B ± 0%      260B ± 0%   +8.33%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
CMuxConn-4      9.00 ± 0%      5.00 ± 0%  -44.44%  (p=0.000 n=20+20)
```

Note that appropriate test coverage is provided by `TestRead`.
This commit is contained in:
Tamir Duberstein 2016-02-25 15:55:09 -05:00
parent d710784914
commit 36dc9894b6
3 changed files with 39 additions and 173 deletions

View File

@ -1,57 +1,35 @@
package cmux package cmux
import "io" import (
"bytes"
"io"
)
var _ io.ReadWriter = (*buffer)(nil) // bufferedReader is an optimized implementation of io.Reader that behaves like
// ```
type buffer struct { // io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
read int // ```
data []byte // without allocating.
type bufferedReader struct {
source io.Reader
buffer *bytes.Buffer
bufferRead int
bufferSize int
} }
// From the io.Reader documentation: func (s *bufferedReader) Read(p []byte) (int, error) {
// // Functionality of bytes.Reader.
// When Read encounters an error or end-of-file condition after bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
// successfully reading n > 0 bytes, it returns the number of s.bufferRead += bn
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (b *buffer) Read(p []byte) (int, error) {
var err error
n := copy(p, b.data[b.read:])
b.read += n
if b.read == len(b.data) {
err = io.EOF
}
return n, err
}
func (b *buffer) Len() int { p = p[bn:]
return len(b.data) - b.read
}
func (b *buffer) resetRead() { // Funtionality of io.TeeReader.
b.read = 0 sn, sErr := s.source.Read(p)
if sn > 0 {
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return bn + wn, wErr
} }
}
// From the io.Writer documentation: return bn + sn, sErr
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
//
// In a previous incarnation, this implementation retained the incoming slice.
func (b *buffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
} }

View File

@ -1,113 +0,0 @@
package cmux
import (
"bytes"
"io"
"testing"
)
func TestWriteNoModify(t *testing.T) {
var b buffer
const origWriteByte = 0
const postWriteByte = 1
writeBytes := []byte{origWriteByte}
if _, err := b.Write(writeBytes); err != nil {
t.Fatal(err)
}
writeBytes[0] = postWriteByte
readBytes := make([]byte, 1)
if _, err := b.Read(readBytes); err != io.EOF {
t.Fatal(err)
}
if readBytes[0] != origWriteByte {
t.Fatalf("expected to read %x, but read %x; buffer retained passed-in slice", origWriteByte, postWriteByte)
}
}
const writeString = "deadbeef"
func TestBuffer(t *testing.T) {
writeBytes := []byte(writeString)
const numWrites = 10
var b buffer
for i := 0; i < numWrites; i++ {
n, err := b.Write(writeBytes)
if err != nil && err != io.EOF {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
}
for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes))
for i := 0; i < numWrites; i++ {
n, err := b.Read(readBytes)
if i == numWrites-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != len(readBytes) {
t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n)
}
if !bytes.Equal(writeBytes, readBytes) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
}
}
n, err := b.Read(readBytes)
if err != io.EOF {
t.Errorf("expected EOF")
}
if n != 0 {
t.Errorf("expected buffer to be empty, but got %d bytes", n)
}
b.resetRead()
}
}
func TestBufferOffset(t *testing.T) {
writeBytes := []byte(writeString)
var b buffer
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
const readSize = 2
numReads := len(writeBytes) / readSize
for i := 0; i < numReads; i++ {
readBytes := make([]byte, readSize)
n, err := b.Read(readBytes)
if i == numReads-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != readSize {
t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
}
if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
}
}
}

21
cmux.go
View File

@ -1,6 +1,7 @@
package cmux package cmux
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -125,8 +126,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
muc := newMuxConn(c) muc := newMuxConn(c)
for _, sl := range m.sls { for _, sl := range m.sls {
for _, s := range sl.ss { for _, s := range sl.ss {
matched := s(muc.sniffer()) matched := s(muc.getSniffer())
muc.reset()
if matched { if matched {
select { select {
case sl.l.connc <- muc: case sl.l.connc <- muc:
@ -177,7 +177,8 @@ func (l muxListener) Accept() (net.Conn, error) {
// MuxConn wraps a net.Conn and provides transparent sniffing of connection data. // MuxConn wraps a net.Conn and provides transparent sniffing of connection data.
type MuxConn struct { type MuxConn struct {
net.Conn net.Conn
buf buffer buf bytes.Buffer
sniffer bufferedReader
} }
func newMuxConn(c net.Conn) *MuxConn { func newMuxConn(c net.Conn) *MuxConn {
@ -201,17 +202,17 @@ func newMuxConn(c net.Conn) *MuxConn {
// (non-nil) error from the same call. // (non-nil) error from the same call.
func (m *MuxConn) Read(p []byte) (int, error) { func (m *MuxConn) Read(p []byte) (int, error) {
n1, err := m.buf.Read(p) n1, err := m.buf.Read(p)
if err != io.EOF { if err == nil && m.buf.Len() == 0 {
err = io.EOF
}
if n1 == len(p) || err != io.EOF {
return n1, err return n1, err
} }
n2, err := m.Conn.Read(p[n1:]) n2, err := m.Conn.Read(p[n1:])
return n1 + n2, err return n1 + n2, err
} }
func (m *MuxConn) sniffer() io.Reader { func (m *MuxConn) getSniffer() io.Reader {
return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf)) m.sniffer = bufferedReader{source: m.Conn, buffer: &m.buf, bufferSize: m.buf.Len()}
} return &m.sniffer
func (m *MuxConn) reset() {
m.buf.resetRead()
} }