From fbd0877935bf0c00b91739c8e158924c8403b797 Mon Sep 17 00:00:00 2001 From: Soheil Hassas Yeganeh Date: Sat, 1 Aug 2015 11:58:14 -0400 Subject: [PATCH] Use a custom buffer intead of buffers from bytes This commit implements a new buffer that eliminates a few copies. --- README.md | 4 +-- buffer.go | 42 +++++++++++++++++++++++++++++ buffer_test.go | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++ cmux.go | 21 +++------------ 4 files changed, 120 insertions(+), 19 deletions(-) create mode 100644 buffer.go create mode 100644 buffer_test.go diff --git a/README.md b/README.md index bfac219..65f1799 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # cmux: Connection Mux ![Travis Build Status](https://api.travis-ci.org/soheilhy/args.svg?branch=master "Travis Build Status") [![GoDoc](https://godoc.org/github.com/soheilhy/cmux?status.svg)](http://godoc.org/github.com/soheilhy/cmux) cmux is a generic Go library to multiplex connections based on -their payload. Using cmux, you can serve gRPC, HTTP, and Go RPC -on the same TCP listener to avoid using one port per +their payload. Using cmux, you can serve gRPC, SSH, HTTPS, HTTP, +and Go RPC on the same TCP listener to avoid using one port per protocol. ## How-To diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..50e206a --- /dev/null +++ b/buffer.go @@ -0,0 +1,42 @@ +package cmux + +import "io" + +type buffer struct { + read int + data []byte +} + +func (b *buffer) Read(p []byte) (n int, err error) { + n = len(b.data) - b.read + if n == 0 { + return 0, io.EOF + } + + if len(p) < n { + n = len(p) + } + + copy(p[:n], b.data[b.read:b.read+n]) + b.read += n + return +} + +func (b *buffer) Len() int { + return len(b.data) - b.read +} + +func (b *buffer) resetRead() { + b.read = 0 +} + +func (b *buffer) Write(p []byte) (n int, err error) { + n = len(p) + if b.data == nil { + b.data = p[:n:n] + return + } + + b.data = append(b.data, p...) + return +} diff --git a/buffer_test.go b/buffer_test.go new file mode 100644 index 0000000..c8a8d0a --- /dev/null +++ b/buffer_test.go @@ -0,0 +1,72 @@ +package cmux + +import ( + "bytes" + "io" + "testing" +) + +func TestBuffer(t *testing.T) { + writeBytes := []byte("deadbeef") + + var b buffer + for i := 0; i < 10; i++ { + n, err := b.Write(writeBytes) + if err != nil { + t.Fatal(err) + } + if n != len(writeBytes) { + t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n) + } + } + + for j := 0; j < 2; j++ { + readBytes := make([]byte, len(writeBytes)) + for i := 0; i < 10; i++ { + n, err := b.Read(readBytes) + if err != nil { + t.Fatal(err) + } + if n != len(readBytes) { + t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n) + } + if !bytes.Equal(writeBytes, readBytes) { + t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes) + } + } + _, err := b.Read(readBytes) + if err != io.EOF { + t.Errorf("expected EOF") + } + + b.resetRead() + } +} + +func TestBufferOffset(t *testing.T) { + writeBytes := []byte("deadbeef") + + var b buffer + n, err := b.Write(writeBytes) + if err != nil { + t.Fatal(err) + } + if n != len(writeBytes) { + t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n) + } + + for i := 0; i < len(writeBytes)/2; i++ { + readBytes := make([]byte, 2) + n, err := b.Read(readBytes) + if err != nil { + t.Fatal(err) + } + if n != 2 { + t.Fatal("cannot read the bytes: want=%d got=%d", 2, n) + } + if !bytes.Equal(readBytes, writeBytes[i*2:i*2+2]) { + t.Fatalf("different bytes read: want=%s got=%s", + readBytes, writeBytes[i*2:i*2+2]) + } + } +} diff --git a/cmux.go b/cmux.go index 18996d6..62712dd 100644 --- a/cmux.go +++ b/cmux.go @@ -1,7 +1,6 @@ package cmux import ( - "bytes" "flag" "fmt" "io" @@ -162,20 +161,17 @@ func (l muxListener) Accept() (c net.Conn, err error) { type MuxConn struct { net.Conn - prv *bytes.Buffer - nxt *bytes.Buffer + buf buffer } func newMuxConn(c net.Conn) *MuxConn { return &MuxConn{ Conn: c, - prv: &bytes.Buffer{}, - nxt: &bytes.Buffer{}, } } func (m *MuxConn) Read(b []byte) (n int, err error) { - if n, err = m.prv.Read(b); err == nil { + if n, err = m.buf.Read(b); err == nil { return } @@ -184,18 +180,9 @@ func (m *MuxConn) Read(b []byte) (n int, err error) { } func (m *MuxConn) sniffer() io.Reader { - return io.MultiReader(io.TeeReader(m.prv, m.nxt), io.TeeReader(m.Conn, m.nxt)) + return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf)) } func (m *MuxConn) reset() { - if m.nxt.Len() == 0 { - return - } - - if m.prv.Len() != 0 { - io.Copy(m.nxt, m.prv) - } - - m.prv, m.nxt = m.nxt, m.prv - m.nxt.Reset() + m.buf.resetRead() }