mirror of
https://github.com/soheilhy/cmux.git
synced 2024-11-14 11:31:28 +08:00
Reduce the number of calls needed to (*MuxConn).Read
Also affects (*buffer).Read.
This commit is contained in:
parent
95fd8b5c56
commit
6490dea199
32
buffer.go
32
buffer.go
@ -7,19 +7,27 @@ type buffer struct {
|
|||||||
data []byte
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *buffer) Read(p []byte) (n int, err error) {
|
// From the io.Reader documentation:
|
||||||
n = len(b.data) - b.read
|
//
|
||||||
if n == 0 {
|
// When Read encounters an error or end-of-file condition after
|
||||||
return 0, io.EOF
|
// successfully reading n > 0 bytes, it returns the number of
|
||||||
}
|
// bytes read. It may return the (non-nil) error from the same call
|
||||||
|
// or return the error (and n == 0) from a subsequent call.
|
||||||
if len(p) < n {
|
// An instance of this general case is that a Reader returning
|
||||||
n = len(p)
|
// a non-zero number of bytes at the end of the input stream may
|
||||||
}
|
// return either err == EOF or err == nil. The next Read should
|
||||||
|
// return 0, EOF.
|
||||||
copy(p[:n], b.data[b.read:b.read+n])
|
//
|
||||||
|
// This function implements the latter behaviour, returning the
|
||||||
|
// (non-nil) error from the same call.
|
||||||
|
func (b *buffer) Read(p []byte) (int, error) {
|
||||||
|
var err error
|
||||||
|
n := copy(p, b.data[b.read:])
|
||||||
b.read += n
|
b.read += n
|
||||||
return
|
if b.read == len(b.data) {
|
||||||
|
err = io.EOF
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *buffer) Len() int {
|
func (b *buffer) Len() int {
|
||||||
|
@ -9,10 +9,12 @@ import (
|
|||||||
func TestBuffer(t *testing.T) {
|
func TestBuffer(t *testing.T) {
|
||||||
writeBytes := []byte("deadbeef")
|
writeBytes := []byte("deadbeef")
|
||||||
|
|
||||||
|
const numWrites = 10
|
||||||
|
|
||||||
var b buffer
|
var b buffer
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < numWrites; i++ {
|
||||||
n, err := b.Write(writeBytes)
|
n, err := b.Write(writeBytes)
|
||||||
if err != nil {
|
if err != nil && err != io.EOF {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if n != len(writeBytes) {
|
if n != len(writeBytes) {
|
||||||
@ -22,9 +24,14 @@ func TestBuffer(t *testing.T) {
|
|||||||
|
|
||||||
for j := 0; j < 2; j++ {
|
for j := 0; j < 2; j++ {
|
||||||
readBytes := make([]byte, len(writeBytes))
|
readBytes := make([]byte, len(writeBytes))
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < numWrites; i++ {
|
||||||
n, err := b.Read(readBytes)
|
n, err := b.Read(readBytes)
|
||||||
if err != nil {
|
if i == numWrites-1 {
|
||||||
|
// The last read should report EOF.
|
||||||
|
if err != io.EOF {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if n != len(readBytes) {
|
if n != len(readBytes) {
|
||||||
@ -34,10 +41,13 @@ func TestBuffer(t *testing.T) {
|
|||||||
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
|
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_, err := b.Read(readBytes)
|
n, err := b.Read(readBytes)
|
||||||
if err != io.EOF {
|
if err != io.EOF {
|
||||||
t.Errorf("expected EOF")
|
t.Errorf("expected EOF")
|
||||||
}
|
}
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("expected buffer to be empty, but got %d bytes", n)
|
||||||
|
}
|
||||||
|
|
||||||
b.resetRead()
|
b.resetRead()
|
||||||
}
|
}
|
||||||
@ -55,18 +65,26 @@ func TestBufferOffset(t *testing.T) {
|
|||||||
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
|
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(writeBytes)/2; i++ {
|
const readSize = 2
|
||||||
readBytes := make([]byte, 2)
|
|
||||||
|
numReads := len(writeBytes) / readSize
|
||||||
|
|
||||||
|
for i := 0; i < numReads; i++ {
|
||||||
|
readBytes := make([]byte, readSize)
|
||||||
n, err := b.Read(readBytes)
|
n, err := b.Read(readBytes)
|
||||||
if err != nil {
|
if i == numReads-1 {
|
||||||
|
// The last read should report EOF.
|
||||||
|
if err != io.EOF {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if n != 2 {
|
} else if err != nil {
|
||||||
t.Fatalf("cannot read the bytes: want=%d got=%d", 2, n)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(readBytes, writeBytes[i*2:i*2+2]) {
|
if n != readSize {
|
||||||
t.Fatalf("different bytes read: want=%s got=%s",
|
t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
|
||||||
readBytes, writeBytes[i*2:i*2+2])
|
}
|
||||||
|
if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
|
||||||
|
t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
25
cmux.go
25
cmux.go
@ -174,13 +174,26 @@ func newMuxConn(c net.Conn) *MuxConn {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MuxConn) Read(b []byte) (n int, err error) {
|
// From the io.Reader documentation:
|
||||||
if n, err = m.buf.Read(b); err == nil {
|
//
|
||||||
return
|
// When Read encounters an error or end-of-file condition after
|
||||||
|
// successfully reading n > 0 bytes, it returns the number of
|
||||||
|
// bytes read. It may return the (non-nil) error from the same call
|
||||||
|
// or return the error (and n == 0) from a subsequent call.
|
||||||
|
// An instance of this general case is that a Reader returning
|
||||||
|
// a non-zero number of bytes at the end of the input stream may
|
||||||
|
// return either err == EOF or err == nil. The next Read should
|
||||||
|
// return 0, EOF.
|
||||||
|
//
|
||||||
|
// This function implements the latter behaviour, returning the
|
||||||
|
// (non-nil) error from the same call.
|
||||||
|
func (m *MuxConn) Read(b []byte) (int, error) {
|
||||||
|
n1, err := m.buf.Read(b)
|
||||||
|
if n1 == len(b) || err != io.EOF {
|
||||||
|
return n1, err
|
||||||
}
|
}
|
||||||
|
n2, err := m.Conn.Read(b[n1:])
|
||||||
n, err = m.Conn.Read(b)
|
return n1 + n2, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MuxConn) sniffer() io.Reader {
|
func (m *MuxConn) sniffer() io.Reader {
|
||||||
|
60
cmux_test.go
60
cmux_test.go
@ -1,7 +1,9 @@
|
|||||||
package cmux
|
package cmux
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -38,6 +40,22 @@ func safeDial(t *testing.T, addr net.Addr) (*rpc.Client, func()) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type chanListener struct {
|
||||||
|
net.Listener
|
||||||
|
connCh chan net.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func newChanListener() *chanListener {
|
||||||
|
return &chanListener{connCh: make(chan net.Conn, 1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *chanListener) Accept() (net.Conn, error) {
|
||||||
|
if c, ok := <-l.connCh; ok {
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New("use of closed network connection")
|
||||||
|
}
|
||||||
|
|
||||||
func testListener(t *testing.T) (net.Listener, func()) {
|
func testListener(t *testing.T) (net.Listener, func()) {
|
||||||
l, err := net.Listen("tcp", ":0")
|
l, err := net.Listen("tcp", ":0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -147,6 +165,48 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRead(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
errCh := make(chan error)
|
||||||
|
defer func() {
|
||||||
|
select {
|
||||||
|
case err := <-errCh:
|
||||||
|
t.Fatal(err)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
const payload = "hello world\r\n"
|
||||||
|
const mult = 2
|
||||||
|
|
||||||
|
writer, reader := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
if _, err := io.WriteString(writer, strings.Repeat(payload, mult)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
l := newChanListener()
|
||||||
|
defer close(l.connCh)
|
||||||
|
l.connCh <- reader
|
||||||
|
muxl := New(l)
|
||||||
|
// Register a bogus matcher to force reading from the conn.
|
||||||
|
muxl.Match(HTTP2())
|
||||||
|
anyl := muxl.Match(Any())
|
||||||
|
go safeServe(errCh, muxl)
|
||||||
|
muxedConn, err := anyl.Accept()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
var b [mult * len(payload)]byte
|
||||||
|
n, err := muxedConn.Read(b[:])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if e := len(b); n != e {
|
||||||
|
t.Errorf("expected to read %d bytes, but read %d bytes", e, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAny(t *testing.T) {
|
func TestAny(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
errCh := make(chan error)
|
errCh := make(chan error)
|
||||||
|
Loading…
Reference in New Issue
Block a user