2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-01-19 03:06:07 +08:00

Merge pull request #12 from tamird/fix-read

Reduce the number of calls needed to (*MuxConn).Read
This commit is contained in:
Soheil Hassas Yeganeh 2016-02-23 11:20:36 -05:00
commit 99ee7b080d
4 changed files with 113 additions and 31 deletions

View File

@ -7,19 +7,27 @@ type buffer struct {
data []byte data []byte
} }
func (b *buffer) Read(p []byte) (n int, err error) { // From the io.Reader documentation:
n = len(b.data) - b.read //
if n == 0 { // When Read encounters an error or end-of-file condition after
return 0, io.EOF // successfully reading n > 0 bytes, it returns the number of
} // bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
if len(p) < n { // An instance of this general case is that a Reader returning
n = len(p) // a non-zero number of bytes at the end of the input stream may
} // return either err == EOF or err == nil. The next Read should
// return 0, EOF.
copy(p[:n], b.data[b.read:b.read+n]) //
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (b *buffer) Read(p []byte) (int, error) {
var err error
n := copy(p, b.data[b.read:])
b.read += n b.read += n
return if b.read == len(b.data) {
err = io.EOF
}
return n, err
} }
func (b *buffer) Len() int { func (b *buffer) Len() int {

View File

@ -9,10 +9,12 @@ import (
func TestBuffer(t *testing.T) { func TestBuffer(t *testing.T) {
writeBytes := []byte("deadbeef") writeBytes := []byte("deadbeef")
const numWrites = 10
var b buffer var b buffer
for i := 0; i < 10; i++ { for i := 0; i < numWrites; i++ {
n, err := b.Write(writeBytes) n, err := b.Write(writeBytes)
if err != nil { if err != nil && err != io.EOF {
t.Fatal(err) t.Fatal(err)
} }
if n != len(writeBytes) { if n != len(writeBytes) {
@ -22,9 +24,14 @@ func TestBuffer(t *testing.T) {
for j := 0; j < 2; j++ { for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes)) readBytes := make([]byte, len(writeBytes))
for i := 0; i < 10; i++ { for i := 0; i < numWrites; i++ {
n, err := b.Read(readBytes) n, err := b.Read(readBytes)
if err != nil { if i == numWrites-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if n != len(readBytes) { if n != len(readBytes) {
@ -34,10 +41,13 @@ func TestBuffer(t *testing.T) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes) t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
} }
} }
_, err := b.Read(readBytes) n, err := b.Read(readBytes)
if err != io.EOF { if err != io.EOF {
t.Errorf("expected EOF") t.Errorf("expected EOF")
} }
if n != 0 {
t.Errorf("expected buffer to be empty, but got %d bytes", n)
}
b.resetRead() b.resetRead()
} }
@ -55,18 +65,26 @@ func TestBufferOffset(t *testing.T) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n) t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
} }
for i := 0; i < len(writeBytes)/2; i++ { const readSize = 2
readBytes := make([]byte, 2)
numReads := len(writeBytes) / readSize
for i := 0; i < numReads; i++ {
readBytes := make([]byte, readSize)
n, err := b.Read(readBytes) n, err := b.Read(readBytes)
if err != nil { if i == numReads-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if n != 2 { if n != readSize {
t.Fatalf("cannot read the bytes: want=%d got=%d", 2, n) t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
} }
if !bytes.Equal(readBytes, writeBytes[i*2:i*2+2]) { if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
t.Fatalf("different bytes read: want=%s got=%s", t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
readBytes, writeBytes[i*2:i*2+2])
} }
} }
} }

25
cmux.go
View File

@ -186,13 +186,26 @@ func newMuxConn(c net.Conn) *MuxConn {
} }
} }
func (m *MuxConn) Read(b []byte) (n int, err error) { // From the io.Reader documentation:
if n, err = m.buf.Read(b); err == nil { //
return // When Read encounters an error or end-of-file condition after
// successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (m *MuxConn) Read(b []byte) (int, error) {
n1, err := m.buf.Read(b)
if n1 == len(b) || err != io.EOF {
return n1, err
} }
n2, err := m.Conn.Read(b[n1:])
n, err = m.Conn.Read(b) return n1 + n2, err
return
} }
func (m *MuxConn) sniffer() io.Reader { func (m *MuxConn) sniffer() io.Reader {

View File

@ -3,6 +3,7 @@ package cmux
import ( import (
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -164,6 +165,48 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
} }
} }
func TestRead(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
const payload = "hello world\r\n"
const mult = 2
writer, reader := net.Pipe()
go func() {
if _, err := io.WriteString(writer, strings.Repeat(payload, mult)); err != nil {
t.Fatal(err)
}
}()
l := newChanListener()
defer close(l.connCh)
l.connCh <- reader
muxl := New(l)
// Register a bogus matcher to force reading from the conn.
muxl.Match(HTTP2())
anyl := muxl.Match(Any())
go safeServe(errCh, muxl)
muxedConn, err := anyl.Accept()
if err != nil {
t.Fatal(err)
}
var b [mult * len(payload)]byte
n, err := muxedConn.Read(b[:])
if err != nil {
t.Fatal(err)
}
if e := len(b); n != e {
t.Errorf("expected to read %d bytes, but read %d bytes", e, n)
}
}
func TestAny(t *testing.T) { func TestAny(t *testing.T) {
defer leakCheck(t)() defer leakCheck(t)()
errCh := make(chan error) errCh := make(chan error)