2
0
mirror of https://github.com/soheilhy/cmux.git synced 2024-11-09 19:21:52 +08:00

Use a custom buffer intead of buffers from bytes

This commit implements a new buffer that eliminates a few copies.
This commit is contained in:
Soheil Hassas Yeganeh 2015-08-01 11:58:14 -04:00
parent 77815df398
commit fbd0877935
4 changed files with 120 additions and 19 deletions

View File

@ -1,8 +1,8 @@
# cmux: Connection Mux ![Travis Build Status](https://api.travis-ci.org/soheilhy/args.svg?branch=master "Travis Build Status") [![GoDoc](https://godoc.org/github.com/soheilhy/cmux?status.svg)](http://godoc.org/github.com/soheilhy/cmux)
cmux is a generic Go library to multiplex connections based on
their payload. Using cmux, you can serve gRPC, HTTP, and Go RPC
on the same TCP listener to avoid using one port per
their payload. Using cmux, you can serve gRPC, SSH, HTTPS, HTTP,
and Go RPC on the same TCP listener to avoid using one port per
protocol.
## How-To

42
buffer.go Normal file
View File

@ -0,0 +1,42 @@
package cmux
import "io"
type buffer struct {
read int
data []byte
}
func (b *buffer) Read(p []byte) (n int, err error) {
n = len(b.data) - b.read
if n == 0 {
return 0, io.EOF
}
if len(p) < n {
n = len(p)
}
copy(p[:n], b.data[b.read:b.read+n])
b.read += n
return
}
func (b *buffer) Len() int {
return len(b.data) - b.read
}
func (b *buffer) resetRead() {
b.read = 0
}
func (b *buffer) Write(p []byte) (n int, err error) {
n = len(p)
if b.data == nil {
b.data = p[:n:n]
return
}
b.data = append(b.data, p...)
return
}

72
buffer_test.go Normal file
View File

@ -0,0 +1,72 @@
package cmux
import (
"bytes"
"io"
"testing"
)
func TestBuffer(t *testing.T) {
writeBytes := []byte("deadbeef")
var b buffer
for i := 0; i < 10; i++ {
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
}
for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes))
for i := 0; i < 10; i++ {
n, err := b.Read(readBytes)
if err != nil {
t.Fatal(err)
}
if n != len(readBytes) {
t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n)
}
if !bytes.Equal(writeBytes, readBytes) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
}
}
_, err := b.Read(readBytes)
if err != io.EOF {
t.Errorf("expected EOF")
}
b.resetRead()
}
}
func TestBufferOffset(t *testing.T) {
writeBytes := []byte("deadbeef")
var b buffer
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
for i := 0; i < len(writeBytes)/2; i++ {
readBytes := make([]byte, 2)
n, err := b.Read(readBytes)
if err != nil {
t.Fatal(err)
}
if n != 2 {
t.Fatal("cannot read the bytes: want=%d got=%d", 2, n)
}
if !bytes.Equal(readBytes, writeBytes[i*2:i*2+2]) {
t.Fatalf("different bytes read: want=%s got=%s",
readBytes, writeBytes[i*2:i*2+2])
}
}
}

21
cmux.go
View File

@ -1,7 +1,6 @@
package cmux
import (
"bytes"
"flag"
"fmt"
"io"
@ -162,20 +161,17 @@ func (l muxListener) Accept() (c net.Conn, err error) {
type MuxConn struct {
net.Conn
prv *bytes.Buffer
nxt *bytes.Buffer
buf buffer
}
func newMuxConn(c net.Conn) *MuxConn {
return &MuxConn{
Conn: c,
prv: &bytes.Buffer{},
nxt: &bytes.Buffer{},
}
}
func (m *MuxConn) Read(b []byte) (n int, err error) {
if n, err = m.prv.Read(b); err == nil {
if n, err = m.buf.Read(b); err == nil {
return
}
@ -184,18 +180,9 @@ func (m *MuxConn) Read(b []byte) (n int, err error) {
}
func (m *MuxConn) sniffer() io.Reader {
return io.MultiReader(io.TeeReader(m.prv, m.nxt), io.TeeReader(m.Conn, m.nxt))
return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf))
}
func (m *MuxConn) reset() {
if m.nxt.Len() == 0 {
return
}
if m.prv.Len() != 0 {
io.Copy(m.nxt, m.prv)
}
m.prv, m.nxt = m.nxt, m.prv
m.nxt.Reset()
m.buf.resetRead()
}