2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-01-19 03:06:07 +08:00

Merge pull request #23 from soheilhy/bytes-buffer

Replace TeeReader with a bytes buffer
This commit is contained in:
Soheil Hassas Yeganeh 2016-04-24 14:38:36 -04:00
commit 255149b822
5 changed files with 71 additions and 187 deletions

View File

@ -1,22 +1,27 @@
language: go
go:
- 1.3
- 1.4
- 1.5
- 1.6
- tip
matrix:
allow_failures:
- go: tip
gobuild_args: -race
before_install:
- go get -u github.com/golang/lint/golint
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then go get -u github.com/kisielk/errcheck; fi
- go get -u golang.org/x/tools/cmd/vet
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u github.com/kisielk/errcheck; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u github.com/golang/lint/golint; fi
before_script:
- '! gofmt -s -l . | read'
- golint ./...
- echo $TRAVIS_GO_VERSION
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then errcheck ./...; fi
- go vet .
- go tool vet --shadow .
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then golint ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then errcheck ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet .; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet --shadow .; fi
script:
- go test -v ./...

View File

@ -1,57 +1,49 @@
package cmux
import "io"
import (
"bytes"
"io"
)
var _ io.ReadWriter = (*buffer)(nil)
type buffer struct {
read int
data []byte
// bufferedReader is an optimized implementation of io.Reader that behaves like
// ```
// io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
// ```
// without allocating.
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
}
// From the io.Reader documentation:
//
// When Read encounters an error or end-of-file condition after
// successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (b *buffer) Read(p []byte) (int, error) {
var err error
n := copy(p, b.data[b.read:])
b.read += n
if b.read == len(b.data) {
err = io.EOF
}
return n, err
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead {
// If we have already read something from the buffer before, we return the
// same data and the last error if any. We need to immediately return,
// otherwise we may block for ever, if we try to be smart and call
// source.Read() seeking a little bit of more data.
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
}
func (b *buffer) Len() int {
return len(b.data) - b.read
// If there is nothing more to return in the sniffed buffer, read from the
// source.
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing {
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
}
}
return sn, sErr
}
func (b *buffer) resetRead() {
b.read = 0
}
// From the io.Writer documentation:
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
//
// In a previous incarnation, this implementation retained the incoming slice.
func (b *buffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0
s.bufferSize = s.buffer.Len()
}

View File

@ -1,113 +0,0 @@
package cmux
import (
"bytes"
"io"
"testing"
)
func TestWriteNoModify(t *testing.T) {
var b buffer
const origWriteByte = 0
const postWriteByte = 1
writeBytes := []byte{origWriteByte}
if _, err := b.Write(writeBytes); err != nil {
t.Fatal(err)
}
writeBytes[0] = postWriteByte
readBytes := make([]byte, 1)
if _, err := b.Read(readBytes); err != io.EOF {
t.Fatal(err)
}
if readBytes[0] != origWriteByte {
t.Fatalf("expected to read %x, but read %x; buffer retained passed-in slice", origWriteByte, postWriteByte)
}
}
const writeString = "deadbeef"
func TestBuffer(t *testing.T) {
writeBytes := []byte(writeString)
const numWrites = 10
var b buffer
for i := 0; i < numWrites; i++ {
n, err := b.Write(writeBytes)
if err != nil && err != io.EOF {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
}
for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes))
for i := 0; i < numWrites; i++ {
n, err := b.Read(readBytes)
if i == numWrites-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != len(readBytes) {
t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n)
}
if !bytes.Equal(writeBytes, readBytes) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
}
}
n, err := b.Read(readBytes)
if err != io.EOF {
t.Errorf("expected EOF")
}
if n != 0 {
t.Errorf("expected buffer to be empty, but got %d bytes", n)
}
b.resetRead()
}
}
func TestBufferOffset(t *testing.T) {
writeBytes := []byte(writeString)
var b buffer
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
const readSize = 2
numReads := len(writeBytes) / readSize
for i := 0; i < numReads; i++ {
readBytes := make([]byte, readSize)
n, err := b.Read(readBytes)
if i == numReads-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != readSize {
t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
}
if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
}
}
}

26
cmux.go
View File

@ -125,9 +125,9 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
muc := newMuxConn(c)
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.sniffer())
muc.reset()
matched := s(muc.startSniffing())
if matched {
muc.doneSniffing()
select {
case sl.l.connc <- muc:
case <-donec:
@ -177,12 +177,13 @@ func (l muxListener) Accept() (net.Conn, error) {
// MuxConn wraps a net.Conn and provides transparent sniffing of connection data.
type MuxConn struct {
net.Conn
buf buffer
buf bufferedReader
}
func newMuxConn(c net.Conn) *MuxConn {
return &MuxConn{
Conn: c,
buf: bufferedReader{source: c},
}
}
@ -196,22 +197,15 @@ func newMuxConn(c net.Conn) *MuxConn {
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (m *MuxConn) Read(p []byte) (int, error) {
n1, err := m.buf.Read(p)
if err != io.EOF {
return n1, err
}
n2, err := m.Conn.Read(p[n1:])
return n1 + n2, err
return m.buf.Read(p)
}
func (m *MuxConn) sniffer() io.Reader {
return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf))
func (m *MuxConn) startSniffing() io.Reader {
m.buf.reset(true)
return &m.buf
}
func (m *MuxConn) reset() {
m.buf.resetRead()
func (m *MuxConn) doneSniffing() {
m.buf.reset(false)
}

View File

@ -284,7 +284,13 @@ func TestHTTP2(t *testing.T) {
t.Fatal(err)
}
var b [len(http2.ClientPreface)]byte
if _, err := muxedConn.Read(b[:]); err != io.EOF {
var n int
// We have the sniffed buffer first...
if n, err = muxedConn.Read(b[:]); err == io.EOF {
t.Fatal(err)
}
// and then we read from the source.
if _, err = muxedConn.Read(b[n:]); err != io.EOF {
t.Fatal(err)
}
if string(b[:]) != http2.ClientPreface {