mirror of
https://github.com/soheilhy/cmux.git
synced 2025-10-18 13:18:09 +08:00
Compare commits
8 Commits
bytes-buff
...
devel-opti
Author | SHA1 | Date | |
---|---|---|---|
|
703b087a39 | ||
|
d45bcbe1db | ||
|
9297b6de56 | ||
|
dc30a14f2d | ||
|
d83a667cb2 | ||
|
255149b822 | ||
|
7ec7ce7ad1 | ||
|
e09914bfa3 |
@@ -67,3 +67,10 @@ would not be set in your handlers.
|
||||
when it's accepted. For example, one connection can be either gRPC or REST, but
|
||||
not both. That is, we assume that a client connection is either used for gRPC
|
||||
or REST.
|
||||
|
||||
* *Java gRPC Clients*: Java gRPC client blocks until it receives a SETTINGS
|
||||
frame from the server. If you are using the Java client to connect to a cmux'ed
|
||||
gRPC server please match with writers:
|
||||
```go
|
||||
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
|
||||
```
|
||||
|
@@ -6,8 +6,20 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
var (
|
||||
benchHTTP1Payload = make([]byte, 4096)
|
||||
benchHTTP2Payload = make([]byte, 4096)
|
||||
)
|
||||
|
||||
func init() {
|
||||
copy(benchHTTP1Payload, []byte("GET http://www.w3.org/ HTTP/1.1"))
|
||||
copy(benchHTTP2Payload, http2.ClientPreface)
|
||||
}
|
||||
|
||||
type mockConn struct {
|
||||
net.Conn
|
||||
r io.Reader
|
||||
@@ -17,20 +29,19 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
|
||||
return c.r.Read(b)
|
||||
}
|
||||
|
||||
func BenchmarkCMuxConn(b *testing.B) {
|
||||
benchHTTPPayload := make([]byte, 4096)
|
||||
copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1"))
|
||||
func discard(l net.Listener) {
|
||||
for {
|
||||
if _, err := l.Accept(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCMuxConnHTTP1(b *testing.B) {
|
||||
m := New(nil).(*cMux)
|
||||
l := m.Match(HTTP1Fast())
|
||||
|
||||
go func() {
|
||||
for {
|
||||
if _, err := l.Accept(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go discard(l)
|
||||
|
||||
donec := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
@@ -39,7 +50,67 @@ func BenchmarkCMuxConn(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := &mockConn{
|
||||
r: bytes.NewReader(benchHTTPPayload),
|
||||
r: bytes.NewReader(benchHTTP1Payload),
|
||||
}
|
||||
m.serve(c, donec, &wg)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCMuxConnHTTP2(b *testing.B) {
|
||||
m := New(nil).(*cMux)
|
||||
l := m.Match(HTTP2())
|
||||
go discard(l)
|
||||
|
||||
donec := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(b.N)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := &mockConn{
|
||||
r: bytes.NewReader(benchHTTP2Payload),
|
||||
}
|
||||
m.serve(c, donec, &wg)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCMuxConnHTTP1n2(b *testing.B) {
|
||||
m := New(nil).(*cMux)
|
||||
l1 := m.Match(HTTP1Fast())
|
||||
l2 := m.Match(HTTP2())
|
||||
|
||||
go discard(l1)
|
||||
go discard(l2)
|
||||
|
||||
donec := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(b.N)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := &mockConn{
|
||||
r: bytes.NewReader(benchHTTP2Payload),
|
||||
}
|
||||
m.serve(c, donec, &wg)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkCMuxConnHTTP2n1(b *testing.B) {
|
||||
m := New(nil).(*cMux)
|
||||
l2 := m.Match(HTTP2())
|
||||
l1 := m.Match(HTTP1Fast())
|
||||
|
||||
go discard(l1)
|
||||
go discard(l2)
|
||||
|
||||
donec := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(b.N)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := &mockConn{
|
||||
r: bytes.NewReader(benchHTTP1Payload),
|
||||
}
|
||||
m.serve(c, donec, &wg)
|
||||
}
|
||||
|
30
cmux.go
30
cmux.go
@@ -10,6 +10,9 @@ import (
|
||||
// Matcher matches a connection based on its content.
|
||||
type Matcher func(io.Reader) bool
|
||||
|
||||
// MatchWriter is a match that can also write response (say to do handshake).
|
||||
type MatchWriter func(io.Writer, io.Reader) bool
|
||||
|
||||
// ErrorHandler handles an error and returns whether
|
||||
// the mux should continue serving the listener.
|
||||
type ErrorHandler func(error) bool
|
||||
@@ -60,6 +63,14 @@ type CMux interface {
|
||||
//
|
||||
// The order used to call Match determines the priority of matchers.
|
||||
Match(...Matcher) net.Listener
|
||||
// MatchWithWriters returns a net.Listener that accepts only the
|
||||
// connections that matched by at least of the matcher writers.
|
||||
//
|
||||
// Prefer Matchers over MatchWriters, since the latter can write on the
|
||||
// connection before the actual handler.
|
||||
//
|
||||
// The order used to call Match determines the priority of matchers.
|
||||
MatchWithWriters(...MatchWriter) net.Listener
|
||||
// Serve starts multiplexing the listener. Serve blocks and perhaps
|
||||
// should be invoked concurrently within a go routine.
|
||||
Serve() error
|
||||
@@ -68,7 +79,7 @@ type CMux interface {
|
||||
}
|
||||
|
||||
type matchersListener struct {
|
||||
ss []Matcher
|
||||
ss []MatchWriter
|
||||
l muxListener
|
||||
}
|
||||
|
||||
@@ -80,7 +91,22 @@ type cMux struct {
|
||||
sls []matchersListener
|
||||
}
|
||||
|
||||
func matchersToMatchWriters(matchers []Matcher) []MatchWriter {
|
||||
mws := make([]MatchWriter, 0, len(matchers))
|
||||
for _, m := range matchers {
|
||||
mws = append(mws, func(w io.Writer, r io.Reader) bool {
|
||||
return m(r)
|
||||
})
|
||||
}
|
||||
return mws
|
||||
}
|
||||
|
||||
func (m *cMux) Match(matchers ...Matcher) net.Listener {
|
||||
mws := matchersToMatchWriters(matchers)
|
||||
return m.MatchWithWriters(mws...)
|
||||
}
|
||||
|
||||
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
|
||||
ml := muxListener{
|
||||
Listener: m.root,
|
||||
connc: make(chan net.Conn, m.bufLen),
|
||||
@@ -125,7 +151,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
|
||||
muc := newMuxConn(c)
|
||||
for _, sl := range m.sls {
|
||||
for _, s := range sl.ss {
|
||||
matched := s(muc.startSniffing())
|
||||
matched := s(muc.Conn, muc.startSniffing())
|
||||
if matched {
|
||||
muc.doneSniffing()
|
||||
select {
|
||||
|
33
cmux_test.go
33
cmux_test.go
@@ -109,21 +109,23 @@ func runTestHTTPServer(errCh chan<- error, l net.Listener) {
|
||||
}
|
||||
|
||||
func runTestHTTP1Client(t *testing.T, addr net.Addr) {
|
||||
if r, err := http.Get("http://" + addr.String()); err != nil {
|
||||
r, err := http.Get("http://" + addr.String())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
defer func() {
|
||||
if err := r.Body.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
if b, err := ioutil.ReadAll(r.Body); err != nil {
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err = r.Body.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if string(b) != testHTTP1Resp {
|
||||
t.Fatalf("invalid response: want=%s got=%s", testHTTP1Resp, b)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(b) != testHTTP1Resp {
|
||||
t.Fatalf("invalid response: want=%s got=%s", testHTTP1Resp, b)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,9 +210,12 @@ func TestRead(t *testing.T) {
|
||||
}
|
||||
for i := 0; i < mult; i++ {
|
||||
var b [len(payload)]byte
|
||||
if n, err := muxedConn.Read(b[:]); err != nil {
|
||||
n, err := muxedConn.Read(b[:])
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
} else if e := len(b); n != e {
|
||||
continue
|
||||
}
|
||||
if e := len(b); n != e {
|
||||
t.Errorf("expected to read %d bytes, but read %d bytes", e, n)
|
||||
}
|
||||
}
|
||||
|
19
matchers.go
19
matchers.go
@@ -94,7 +94,16 @@ func HTTP1HeaderField(name, value string) Matcher {
|
||||
// headers frame.
|
||||
func HTTP2HeaderField(name, value string) Matcher {
|
||||
return func(r io.Reader) bool {
|
||||
return matchHTTP2Field(r, name, value)
|
||||
return matchHTTP2Field(ioutil.Discard, r, name, value)
|
||||
}
|
||||
}
|
||||
|
||||
// HTTP2MatchHeaderFieldSendSettings matches the header field and writes the
|
||||
// settings to the server. Prefer HTTP2HeaderField over this one, if the client
|
||||
// does not block on receiving a SETTING frame.
|
||||
func HTTP2MatchHeaderFieldSendSettings(name, value string) MatchWriter {
|
||||
return func(w io.Writer, r io.Reader) bool {
|
||||
return matchHTTP2Field(w, r, name, value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,12 +125,12 @@ func matchHTTP1Field(r io.Reader, name, value string) (matched bool) {
|
||||
return req.Header.Get(name) == value
|
||||
}
|
||||
|
||||
func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
|
||||
func matchHTTP2Field(w io.Writer, r io.Reader, name, value string) (matched bool) {
|
||||
if !hasHTTP2Preface(r) {
|
||||
return false
|
||||
}
|
||||
|
||||
framer := http2.NewFramer(ioutil.Discard, r)
|
||||
framer := http2.NewFramer(w, r)
|
||||
hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) {
|
||||
if hf.Name == name && hf.Value == value {
|
||||
matched = true
|
||||
@@ -134,6 +143,10 @@ func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
|
||||
}
|
||||
|
||||
switch f := f.(type) {
|
||||
case *http2.SettingsFrame:
|
||||
if err := framer.WriteSettings(); err != nil {
|
||||
return false
|
||||
}
|
||||
case *http2.HeadersFrame:
|
||||
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
|
||||
return false
|
||||
|
76
patricia.go
76
patricia.go
@@ -9,11 +9,19 @@ import (
|
||||
// and cannot be changed after instantiation.
|
||||
type patriciaTree struct {
|
||||
root *ptNode
|
||||
buf []byte // preallocated buffer to read data while matching
|
||||
}
|
||||
|
||||
func newPatriciaTree(b ...[]byte) *patriciaTree {
|
||||
func newPatriciaTree(bs ...[]byte) *patriciaTree {
|
||||
max := 0
|
||||
for _, b := range bs {
|
||||
if max < len(b) {
|
||||
max = len(b)
|
||||
}
|
||||
}
|
||||
return &patriciaTree{
|
||||
root: newNode(b),
|
||||
root: newNode(bs),
|
||||
buf: make([]byte, max+1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,17 +30,17 @@ func newPatriciaTreeString(strs ...string) *patriciaTree {
|
||||
for i, s := range strs {
|
||||
b[i] = []byte(s)
|
||||
}
|
||||
return &patriciaTree{
|
||||
root: newNode(b),
|
||||
}
|
||||
return newPatriciaTree(b...)
|
||||
}
|
||||
|
||||
func (t *patriciaTree) matchPrefix(r io.Reader) bool {
|
||||
return t.root.match(r, true)
|
||||
n, _ := io.ReadFull(r, t.buf)
|
||||
return t.root.match(t.buf[:n], true)
|
||||
}
|
||||
|
||||
func (t *patriciaTree) match(r io.Reader) bool {
|
||||
return t.root.match(r, false)
|
||||
n, _ := io.ReadFull(r, t.buf)
|
||||
return t.root.match(t.buf[:n], false)
|
||||
}
|
||||
|
||||
type ptNode struct {
|
||||
@@ -122,52 +130,30 @@ func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) {
|
||||
return prefix, rest
|
||||
}
|
||||
|
||||
func readBytes(r io.Reader, n int) (b []byte, err error) {
|
||||
b = make([]byte, n)
|
||||
o := 0
|
||||
for o < n {
|
||||
nr, err := r.Read(b[o:])
|
||||
if err != nil && err != io.EOF {
|
||||
return b, err
|
||||
func (n *ptNode) match(b []byte, prefix bool) bool {
|
||||
l := len(n.prefix)
|
||||
if l > 0 {
|
||||
if l > len(b) {
|
||||
l = len(b)
|
||||
}
|
||||
|
||||
o += nr
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
return b[:o], nil
|
||||
}
|
||||
|
||||
func (n *ptNode) match(r io.Reader, prefix bool) bool {
|
||||
if l := len(n.prefix); l > 0 {
|
||||
b, err := readBytes(r, l)
|
||||
if err != nil || len(b) != l || !bytes.Equal(b, n.prefix) {
|
||||
if !bytes.Equal(b[:l], n.prefix) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if prefix && n.terminal {
|
||||
if n.terminal && (prefix || len(n.prefix) == len(b)) {
|
||||
return true
|
||||
}
|
||||
|
||||
b := make([]byte, 1)
|
||||
for {
|
||||
nr, err := r.Read(b)
|
||||
if nr != 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
return n.terminal
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
nextN, ok := n.next[b[l]]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
nextN, ok := n.next[b[0]]
|
||||
return ok && nextN.match(r, prefix)
|
||||
if l == len(b) {
|
||||
b = b[l:l]
|
||||
} else {
|
||||
b = b[l+1:]
|
||||
}
|
||||
return nextN.match(b, prefix)
|
||||
}
|
||||
|
Reference in New Issue
Block a user