2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-10-18 05:08:08 +08:00

8 Commits

Author SHA1 Message Date
Soheil Hassas Yeganeh
703b087a39 Optimize Patricia tree
Remove all the extra allocations in the Patricia tree.

O(1) allocation for Patricia and  ~10% improvement for HTTP1 matching.

benchmark                      old ns/op     new ns/op     delta
BenchmarkCMuxConnHTTP1-4       908           782           -13.88%
BenchmarkCMuxConnHTTP2-4       835           818           -2.04%
BenchmarkCMuxConnHTTP1n2-4     1074          1033          -3.82%
BenchmarkCMuxConnHTTP2n1-4     1010          901           -10.79%

benchmark                      old allocs     new allocs     delta
BenchmarkCMuxConnHTTP1-4       5              3              -40.00%
BenchmarkCMuxConnHTTP2-4       4              4              +0.00%
BenchmarkCMuxConnHTTP1n2-4     6              4              -33.33%
BenchmarkCMuxConnHTTP2n1-4     6              4              -33.33%

benchmark                      old bytes     new bytes     delta
BenchmarkCMuxConnHTTP1-4       276           272           -1.45%
BenchmarkCMuxConnHTTP2-4       304           304           +0.00%
BenchmarkCMuxConnHTTP1n2-4     306           304           -0.65%
BenchmarkCMuxConnHTTP2n1-4     308           304           -1.30%
2016-05-03 22:28:32 -04:00
Soheil Hassas Yeganeh
d45bcbe1db Add more benchmarks
Add benchmarks for HTTP2 matchers and combinations of it with HTTP1Fast.
2016-05-03 22:14:35 -04:00
Soheil Hassas Yeganeh
9297b6de56 Merge pull request #24 from soheilhy/fix-java-client
Fix java gRPC client
2016-04-24 14:53:47 -04:00
Soheil Hassas Yeganeh
dc30a14f2d Add docs for the Java gRPC client 2016-04-24 14:52:49 -04:00
Soheil Hassas Yeganeh
d83a667cb2 Add Matchers that can write back on the channel
As reported in issue #22 reports that Java gRPC clients cannot
handshake with cmux'ed gRPC server, since the client does not
immediately send a header with the content-type field. The reason
is that the java client, block on receiving the first SETTING
frame.

Add MatchWriter that can match and write on the connection. Implement
a MatchWriter that writes a SETTING frame once it receives a SETTING
frame.
2016-04-24 14:47:08 -04:00
Soheil Hassas Yeganeh
255149b822 Merge pull request #23 from soheilhy/bytes-buffer
Replace TeeReader with a bytes buffer
2016-04-24 14:38:36 -04:00
Soheil Hassas Yeganeh
7ec7ce7ad1 Merge pull request #21 from soheilhy/devel
Use the readable indentation for error flow
2016-02-27 18:16:04 -05:00
Soheil Hassas Yeganeh
e09914bfa3 Use the readable indentation for error flow 2016-02-27 13:04:49 -05:00
6 changed files with 183 additions and 75 deletions

View File

@@ -67,3 +67,10 @@ would not be set in your handlers.
when it's accepted. For example, one connection can be either gRPC or REST, but
not both. That is, we assume that a client connection is either used for gRPC
or REST.
* *Java gRPC Clients*: Java gRPC client blocks until it receives a SETTINGS
frame from the server. If you are using the Java client to connect to a cmux'ed
gRPC server please match with writers:
```go
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
```

View File

@@ -6,8 +6,20 @@ import (
"net"
"sync"
"testing"
"golang.org/x/net/http2"
)
var (
benchHTTP1Payload = make([]byte, 4096)
benchHTTP2Payload = make([]byte, 4096)
)
func init() {
copy(benchHTTP1Payload, []byte("GET http://www.w3.org/ HTTP/1.1"))
copy(benchHTTP2Payload, http2.ClientPreface)
}
type mockConn struct {
net.Conn
r io.Reader
@@ -17,20 +29,19 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
return c.r.Read(b)
}
func BenchmarkCMuxConn(b *testing.B) {
benchHTTPPayload := make([]byte, 4096)
copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1"))
func discard(l net.Listener) {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}
func BenchmarkCMuxConnHTTP1(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP1Fast())
go func() {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}()
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
@@ -39,7 +50,67 @@ func BenchmarkCMuxConn(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTPPayload),
r: bytes.NewReader(benchHTTP1Payload),
}
m.serve(c, donec, &wg)
}
}
func BenchmarkCMuxConnHTTP2(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP2())
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}
m.serve(c, donec, &wg)
}
}
func BenchmarkCMuxConnHTTP1n2(b *testing.B) {
m := New(nil).(*cMux)
l1 := m.Match(HTTP1Fast())
l2 := m.Match(HTTP2())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}
m.serve(c, donec, &wg)
}
}
func BenchmarkCMuxConnHTTP2n1(b *testing.B) {
m := New(nil).(*cMux)
l2 := m.Match(HTTP2())
l1 := m.Match(HTTP1Fast())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTP1Payload),
}
m.serve(c, donec, &wg)
}

30
cmux.go
View File

@@ -10,6 +10,9 @@ import (
// Matcher matches a connection based on its content.
type Matcher func(io.Reader) bool
// MatchWriter is a match that can also write response (say to do handshake).
type MatchWriter func(io.Writer, io.Reader) bool
// ErrorHandler handles an error and returns whether
// the mux should continue serving the listener.
type ErrorHandler func(error) bool
@@ -60,6 +63,14 @@ type CMux interface {
//
// The order used to call Match determines the priority of matchers.
Match(...Matcher) net.Listener
// MatchWithWriters returns a net.Listener that accepts only the
// connections that matched by at least of the matcher writers.
//
// Prefer Matchers over MatchWriters, since the latter can write on the
// connection before the actual handler.
//
// The order used to call Match determines the priority of matchers.
MatchWithWriters(...MatchWriter) net.Listener
// Serve starts multiplexing the listener. Serve blocks and perhaps
// should be invoked concurrently within a go routine.
Serve() error
@@ -68,7 +79,7 @@ type CMux interface {
}
type matchersListener struct {
ss []Matcher
ss []MatchWriter
l muxListener
}
@@ -80,7 +91,22 @@ type cMux struct {
sls []matchersListener
}
func matchersToMatchWriters(matchers []Matcher) []MatchWriter {
mws := make([]MatchWriter, 0, len(matchers))
for _, m := range matchers {
mws = append(mws, func(w io.Writer, r io.Reader) bool {
return m(r)
})
}
return mws
}
func (m *cMux) Match(matchers ...Matcher) net.Listener {
mws := matchersToMatchWriters(matchers)
return m.MatchWithWriters(mws...)
}
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
@@ -125,7 +151,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
muc := newMuxConn(c)
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.startSniffing())
matched := s(muc.Conn, muc.startSniffing())
if matched {
muc.doneSniffing()
select {

View File

@@ -109,21 +109,23 @@ func runTestHTTPServer(errCh chan<- error, l net.Listener) {
}
func runTestHTTP1Client(t *testing.T, addr net.Addr) {
if r, err := http.Get("http://" + addr.String()); err != nil {
r, err := http.Get("http://" + addr.String())
if err != nil {
t.Fatal(err)
} else {
defer func() {
if err := r.Body.Close(); err != nil {
t.Fatal(err)
}
}()
if b, err := ioutil.ReadAll(r.Body); err != nil {
}
defer func() {
if err = r.Body.Close(); err != nil {
t.Fatal(err)
} else {
if string(b) != testHTTP1Resp {
t.Fatalf("invalid response: want=%s got=%s", testHTTP1Resp, b)
}
}
}()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
if string(b) != testHTTP1Resp {
t.Fatalf("invalid response: want=%s got=%s", testHTTP1Resp, b)
}
}
@@ -208,9 +210,12 @@ func TestRead(t *testing.T) {
}
for i := 0; i < mult; i++ {
var b [len(payload)]byte
if n, err := muxedConn.Read(b[:]); err != nil {
n, err := muxedConn.Read(b[:])
if err != nil {
t.Error(err)
} else if e := len(b); n != e {
continue
}
if e := len(b); n != e {
t.Errorf("expected to read %d bytes, but read %d bytes", e, n)
}
}

View File

@@ -94,7 +94,16 @@ func HTTP1HeaderField(name, value string) Matcher {
// headers frame.
func HTTP2HeaderField(name, value string) Matcher {
return func(r io.Reader) bool {
return matchHTTP2Field(r, name, value)
return matchHTTP2Field(ioutil.Discard, r, name, value)
}
}
// HTTP2MatchHeaderFieldSendSettings matches the header field and writes the
// settings to the server. Prefer HTTP2HeaderField over this one, if the client
// does not block on receiving a SETTING frame.
func HTTP2MatchHeaderFieldSendSettings(name, value string) MatchWriter {
return func(w io.Writer, r io.Reader) bool {
return matchHTTP2Field(w, r, name, value)
}
}
@@ -116,12 +125,12 @@ func matchHTTP1Field(r io.Reader, name, value string) (matched bool) {
return req.Header.Get(name) == value
}
func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
func matchHTTP2Field(w io.Writer, r io.Reader, name, value string) (matched bool) {
if !hasHTTP2Preface(r) {
return false
}
framer := http2.NewFramer(ioutil.Discard, r)
framer := http2.NewFramer(w, r)
hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) {
if hf.Name == name && hf.Value == value {
matched = true
@@ -134,6 +143,10 @@ func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
}
switch f := f.(type) {
case *http2.SettingsFrame:
if err := framer.WriteSettings(); err != nil {
return false
}
case *http2.HeadersFrame:
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
return false

View File

@@ -9,11 +9,19 @@ import (
// and cannot be changed after instantiation.
type patriciaTree struct {
root *ptNode
buf []byte // preallocated buffer to read data while matching
}
func newPatriciaTree(b ...[]byte) *patriciaTree {
func newPatriciaTree(bs ...[]byte) *patriciaTree {
max := 0
for _, b := range bs {
if max < len(b) {
max = len(b)
}
}
return &patriciaTree{
root: newNode(b),
root: newNode(bs),
buf: make([]byte, max+1),
}
}
@@ -22,17 +30,17 @@ func newPatriciaTreeString(strs ...string) *patriciaTree {
for i, s := range strs {
b[i] = []byte(s)
}
return &patriciaTree{
root: newNode(b),
}
return newPatriciaTree(b...)
}
func (t *patriciaTree) matchPrefix(r io.Reader) bool {
return t.root.match(r, true)
n, _ := io.ReadFull(r, t.buf)
return t.root.match(t.buf[:n], true)
}
func (t *patriciaTree) match(r io.Reader) bool {
return t.root.match(r, false)
n, _ := io.ReadFull(r, t.buf)
return t.root.match(t.buf[:n], false)
}
type ptNode struct {
@@ -122,52 +130,30 @@ func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) {
return prefix, rest
}
func readBytes(r io.Reader, n int) (b []byte, err error) {
b = make([]byte, n)
o := 0
for o < n {
nr, err := r.Read(b[o:])
if err != nil && err != io.EOF {
return b, err
func (n *ptNode) match(b []byte, prefix bool) bool {
l := len(n.prefix)
if l > 0 {
if l > len(b) {
l = len(b)
}
o += nr
if err == io.EOF {
break
}
}
return b[:o], nil
}
func (n *ptNode) match(r io.Reader, prefix bool) bool {
if l := len(n.prefix); l > 0 {
b, err := readBytes(r, l)
if err != nil || len(b) != l || !bytes.Equal(b, n.prefix) {
if !bytes.Equal(b[:l], n.prefix) {
return false
}
}
if prefix && n.terminal {
if n.terminal && (prefix || len(n.prefix) == len(b)) {
return true
}
b := make([]byte, 1)
for {
nr, err := r.Read(b)
if nr != 0 {
break
}
if err == io.EOF {
return n.terminal
}
if err != nil {
return false
}
nextN, ok := n.next[b[l]]
if !ok {
return false
}
nextN, ok := n.next[b[0]]
return ok && nextN.match(r, prefix)
if l == len(b) {
b = b[l:l]
} else {
b = b[l+1:]
}
return nextN.match(b, prefix)
}