2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-10-17 20:58:14 +08:00

26 Commits

Author SHA1 Message Date
Soheil Hassas Yeganeh
bf4a8ede9e Merge pull request #36 from soheilhy/dev/fix-mem-grow
Fix memory growth
2016-09-25 21:07:37 -04:00
Soheil Hassas Yeganeh
f661dcfb59 Reset the sniffing buffer when not needed
The sniffing buffer will live as long as the connection is open,
and we should reset it as soon as the application has read all the
sniffed data.
2016-09-25 20:56:09 -04:00
Soheil Hassas Yeganeh
526b64db7a update the list of contributors. 2016-09-25 01:03:35 -04:00
Soheil Hassas Yeganeh
3ac8d3a667 Fix lint errors in cmux_test.go 2016-09-25 00:58:27 -04:00
Soheil Hassas Yeganeh
861c99e0fc Return not-match on different field values in HTTP2
Retun as soon as we have the matched field in the HTTP2 matcher
regardless of weather the value is matched or not. Fixes #35.

Issue #35 reports that cmux leaks memory when the client is HTTP2
but does not sends the expected header field. For example, when
the non-gRPC client sends a large field in the header and we are
matching for gRPC, we waste a lot of memory in the sniff buffer.
2016-09-25 00:58:17 -04:00
Soheil Hassas Yeganeh
13f520d62c Merge pull request #34 from ekle/master
SetReadDeadline for Matching
2016-09-05 16:05:55 -04:00
Andreas Jaekle
e132036cce SetReadDeadline for Matching 2016-09-05 19:56:50 +02:00
Soheil Hassas Yeganeh
b26951527b Merge pull request #33 from soheilhy/fix-b32
Fix index out of range in patricia tree
2016-07-15 11:38:34 -07:00
Soheil Hassas Yeganeh
eddb3b1467 Fix index out of range in patricia tree
Bug #32 reported that there is an index out of range error. This
issue was introduced in 703b087.

Fix #32 and add a test to detect this issue
2016-07-15 10:01:37 -07:00
Soheil Hassas Yeganeh
e85da3027e Merge pull request #30 from soheilhy/fix-patricia-tree
Fix race in patricia tree
2016-07-15 08:04:18 -07:00
Tamir Duberstein
df31d48636 Parallelize benchmarks 2016-07-14 22:03:14 -07:00
Soheil Hassas Yeganeh
fd01d3cc6c Fix race in patricia tree
This commit fixes a major issue added in 703b087a.

There are still wins on allocation though.

benchmark                      old ns/op     new ns/op     delta
BenchmarkCMuxConnHTTP1-4       783           836           +6.77%
BenchmarkCMuxConnHTTP2-4       895           806           -9.94%
BenchmarkCMuxConnHTTP1n2-4     1000          1026          +2.60%
BenchmarkCMuxConnHTTP2n1-4     916           961           +4.91%

benchmark                      old allocs     new allocs     delta
BenchmarkCMuxConnHTTP1-4       3              4              +33.33%
BenchmarkCMuxConnHTTP2-4       4              4              +0.00%
BenchmarkCMuxConnHTTP1n2-4     4              5              +25.00%
BenchmarkCMuxConnHTTP2n1-4     4              5              +25.00%

benchmark                      old bytes     new bytes     delta
BenchmarkCMuxConnHTTP1-4       272           280           +2.94%
BenchmarkCMuxConnHTTP2-4       304           304           +0.00%
BenchmarkCMuxConnHTTP1n2-4     304           312           +2.63%
BenchmarkCMuxConnHTTP2n1-4     304           312           +2.63%
2016-07-14 22:01:30 -07:00
Soheil Hassas Yeganeh
f952454ed9 Add the list of contributors
Add the list of contributors in a separate file and add links
to the LICENSE and CONTRIBUTORS files.

Fixes #28
2016-07-09 14:07:27 -04:00
Soheil Hassas Yeganeh
00342b4d79 Add package level docs 2016-07-09 14:01:24 -04:00
Soheil Hassas Yeganeh
cd9b7d74b9 Add copyright notice headers on all Go files.
As per http://www.apache.org/legal/src-headers.html#headers

Fixes #28
2016-07-09 13:56:02 -04:00
Soheil Hassas Yeganeh
9d1e2a64dd Merge pull request #25 from soheilhy/devel-optimize-patricia
Optimize Patricia tree
2016-05-03 22:33:52 -04:00
Soheil Hassas Yeganeh
703b087a39 Optimize Patricia tree
Remove all the extra allocations in the Patricia tree.

O(1) allocation for Patricia and  ~10% improvement for HTTP1 matching.

benchmark                      old ns/op     new ns/op     delta
BenchmarkCMuxConnHTTP1-4       908           782           -13.88%
BenchmarkCMuxConnHTTP2-4       835           818           -2.04%
BenchmarkCMuxConnHTTP1n2-4     1074          1033          -3.82%
BenchmarkCMuxConnHTTP2n1-4     1010          901           -10.79%

benchmark                      old allocs     new allocs     delta
BenchmarkCMuxConnHTTP1-4       5              3              -40.00%
BenchmarkCMuxConnHTTP2-4       4              4              +0.00%
BenchmarkCMuxConnHTTP1n2-4     6              4              -33.33%
BenchmarkCMuxConnHTTP2n1-4     6              4              -33.33%

benchmark                      old bytes     new bytes     delta
BenchmarkCMuxConnHTTP1-4       276           272           -1.45%
BenchmarkCMuxConnHTTP2-4       304           304           +0.00%
BenchmarkCMuxConnHTTP1n2-4     306           304           -0.65%
BenchmarkCMuxConnHTTP2n1-4     308           304           -1.30%
2016-05-03 22:28:32 -04:00
Soheil Hassas Yeganeh
d45bcbe1db Add more benchmarks
Add benchmarks for HTTP2 matchers and combinations of it with HTTP1Fast.
2016-05-03 22:14:35 -04:00
Soheil Hassas Yeganeh
9297b6de56 Merge pull request #24 from soheilhy/fix-java-client
Fix java gRPC client
2016-04-24 14:53:47 -04:00
Soheil Hassas Yeganeh
dc30a14f2d Add docs for the Java gRPC client 2016-04-24 14:52:49 -04:00
Soheil Hassas Yeganeh
d83a667cb2 Add Matchers that can write back on the channel
As reported in issue #22 reports that Java gRPC clients cannot
handshake with cmux'ed gRPC server, since the client does not
immediately send a header with the content-type field. The reason
is that the java client, block on receiving the first SETTING
frame.

Add MatchWriter that can match and write on the connection. Implement
a MatchWriter that writes a SETTING frame once it receives a SETTING
frame.
2016-04-24 14:47:08 -04:00
Soheil Hassas Yeganeh
255149b822 Merge pull request #23 from soheilhy/bytes-buffer
Replace TeeReader with a bytes buffer
2016-04-24 14:38:36 -04:00
Soheil Hassas Yeganeh
3077b24d47 Fix and optimize travis config
There is no point in running shadow and lints for all version of go.

Also, remove 1.3 and 1.4.
2016-04-24 14:10:41 -04:00
Soheil Hassas Yeganeh
d5924ef0b4 Fix a blocking issue in buffer reader
After sniffing and buffering data, if we try to read from
the socket again, bufio.Reader may block. This breaks HTTP handlers
in go1.5.2+ if one tries on browsers or with curl. Go's HTTP client,
however, is not broken. This issue is also there with TeeReader.

Return immediately with the data in the sniffed buffer.
2016-04-24 12:55:13 -04:00
Tamir Duberstein
59b6f01712 Replace buffer with bufferedReader
bufferedReader is an optimized implementation of io.Reader that behaves
like
```
io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
```
without allocating.

This has a measurable effect on benchmarks:
```
name        old time/op    new time/op    delta
CMuxConn-4    1.09µs ± 4%    0.99µs ±19%   -9.32%  (p=0.000 n=17+19)

name        old alloc/op   new alloc/op   delta
CMuxConn-4      240B ± 0%      260B ± 0%   +8.33%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
CMuxConn-4      9.00 ± 0%      5.00 ± 0%  -44.44%  (p=0.000 n=20+20)
```

Note that appropriate test coverage is provided by `TestRead`.
2016-02-28 19:41:37 -05:00
Soheil Hassas Yeganeh
7ec7ce7ad1 Merge pull request #21 from soheilhy/devel
Use the readable indentation for error flow
2016-02-27 18:16:04 -05:00
15 changed files with 644 additions and 274 deletions

View File

@@ -1,22 +1,29 @@
language: go
go:
- 1.3
- 1.4
- 1.5
- 1.6
- 1.7
- tip
matrix:
allow_failures:
- go: tip
gobuild_args: -race
before_install:
- go get -u github.com/golang/lint/golint
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then go get -u github.com/kisielk/errcheck; fi
- go get -u golang.org/x/tools/cmd/vet
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u github.com/kisielk/errcheck; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u github.com/golang/lint/golint; fi
before_script:
- '! gofmt -s -l . | read'
- golint ./...
- echo $TRAVIS_GO_VERSION
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then errcheck ./...; fi
- go vet .
- go tool vet --shadow .
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then golint ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then errcheck ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet .; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet --shadow .; fi
script:
- go test -bench . -v ./...
- go test -race -bench . -v ./...

12
CONTRIBUTORS Normal file
View File

@@ -0,0 +1,12 @@
# The list of people who have contributed code to the cmux repository.
#
# Auto-generated with:
# git log --oneline --pretty=format:'%an <%aE>' | sort -u
#
Andreas Jaekle <andreas@jaekle.net>
Dmitri Shuralyov <shurcooL@gmail.com>
Ethan Mosbaugh <emosbaugh@gmail.com>
Soheil Hassas Yeganeh <soheil.h.y@gmail.com>
Soheil Hassas Yeganeh <soheil@cs.toronto.edu>
Tamir Duberstein <tamir@cockroachlabs.com>
Tamir Duberstein <tamird@gmail.com>

View File

@@ -67,3 +67,17 @@ would not be set in your handlers.
when it's accepted. For example, one connection can be either gRPC or REST, but
not both. That is, we assume that a client connection is either used for gRPC
or REST.
* *Java gRPC Clients*: Java gRPC client blocks until it receives a SETTINGS
frame from the server. If you are using the Java client to connect to a cmux'ed
gRPC server please match with writers:
```go
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
```
# Copyright and License
Copyright 2016 The CMux Authors. All rights reserved.
See [CONTRIBUTORS](https://github.com/soheilhy/cmux/blob/master/CONTRIBUTORS)
for the CMux Authors. Code is released under
[the Apache 2 license](https://github.com/soheilhy/cmux/blob/master/LICENSE).

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -6,8 +20,21 @@ import (
"net"
"sync"
"testing"
"time"
"golang.org/x/net/http2"
)
var (
benchHTTP1Payload = make([]byte, 4096)
benchHTTP2Payload = make([]byte, 4096)
)
func init() {
copy(benchHTTP1Payload, []byte("GET http://www.w3.org/ HTTP/1.1"))
copy(benchHTTP2Payload, http2.ClientPreface)
}
type mockConn struct {
net.Conn
r io.Reader
@@ -17,30 +44,99 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
return c.r.Read(b)
}
func BenchmarkCMuxConn(b *testing.B) {
benchHTTPPayload := make([]byte, 4096)
copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1"))
func (c *mockConn) SetReadDeadline(time.Time) error {
return nil
}
func discard(l net.Listener) {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}
func BenchmarkCMuxConnHTTP1(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP1Fast())
go func() {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}()
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTPPayload),
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP1Payload),
}, donec, &wg)
}
m.serve(c, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP2(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP2())
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP1n2(b *testing.B) {
m := New(nil).(*cMux)
l1 := m.Match(HTTP1Fast())
l2 := m.Match(HTTP2())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP2n1(b *testing.B) {
m := New(nil).(*cMux)
l2 := m.Match(HTTP2())
l1 := m.Match(HTTP1Fast())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP1Payload),
}, donec, &wg)
}
})
}

106
buffer.go
View File

@@ -1,57 +1,67 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import "io"
import (
"bytes"
"io"
)
var _ io.ReadWriter = (*buffer)(nil)
type buffer struct {
read int
data []byte
// bufferedReader is an optimized implementation of io.Reader that behaves like
// ```
// io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
// ```
// without allocating.
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
}
// From the io.Reader documentation:
//
// When Read encounters an error or end-of-file condition after
// successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (b *buffer) Read(p []byte) (int, error) {
var err error
n := copy(p, b.data[b.read:])
b.read += n
if b.read == len(b.data) {
err = io.EOF
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead {
// If we have already read something from the buffer before, we return the
// same data and the last error if any. We need to immediately return,
// otherwise we may block for ever, if we try to be smart and call
// source.Read() seeking a little bit of more data.
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
} else if !s.sniffing && s.buffer.Cap() != 0 {
// We don't need the buffer anymore.
// Reset it to release the internal slice.
s.buffer = bytes.Buffer{}
}
return n, err
// If there is nothing more to return in the sniffed buffer, read from the
// source.
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing {
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
}
}
return sn, sErr
}
func (b *buffer) Len() int {
return len(b.data) - b.read
}
func (b *buffer) resetRead() {
b.read = 0
}
// From the io.Writer documentation:
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
//
// In a previous incarnation, this implementation retained the incoming slice.
func (b *buffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0
s.bufferSize = s.buffer.Len()
}

View File

@@ -1,113 +0,0 @@
package cmux
import (
"bytes"
"io"
"testing"
)
func TestWriteNoModify(t *testing.T) {
var b buffer
const origWriteByte = 0
const postWriteByte = 1
writeBytes := []byte{origWriteByte}
if _, err := b.Write(writeBytes); err != nil {
t.Fatal(err)
}
writeBytes[0] = postWriteByte
readBytes := make([]byte, 1)
if _, err := b.Read(readBytes); err != io.EOF {
t.Fatal(err)
}
if readBytes[0] != origWriteByte {
t.Fatalf("expected to read %x, but read %x; buffer retained passed-in slice", origWriteByte, postWriteByte)
}
}
const writeString = "deadbeef"
func TestBuffer(t *testing.T) {
writeBytes := []byte(writeString)
const numWrites = 10
var b buffer
for i := 0; i < numWrites; i++ {
n, err := b.Write(writeBytes)
if err != nil && err != io.EOF {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
}
for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes))
for i := 0; i < numWrites; i++ {
n, err := b.Read(readBytes)
if i == numWrites-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != len(readBytes) {
t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n)
}
if !bytes.Equal(writeBytes, readBytes) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
}
}
n, err := b.Read(readBytes)
if err != io.EOF {
t.Errorf("expected EOF")
}
if n != 0 {
t.Errorf("expected buffer to be empty, but got %d bytes", n)
}
b.resetRead()
}
}
func TestBufferOffset(t *testing.T) {
writeBytes := []byte(writeString)
var b buffer
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
const readSize = 2
numReads := len(writeBytes) / readSize
for i := 0; i < numReads; i++ {
readBytes := make([]byte, readSize)
n, err := b.Read(readBytes)
if i == numReads-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != readSize {
t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
}
if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
}
}
}

104
cmux.go
View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -5,11 +19,15 @@ import (
"io"
"net"
"sync"
"time"
)
// Matcher matches a connection based on its content.
type Matcher func(io.Reader) bool
// MatchWriter is a match that can also write response (say to do handshake).
type MatchWriter func(io.Writer, io.Reader) bool
// ErrorHandler handles an error and returns whether
// the mux should continue serving the listener.
type ErrorHandler func(error) bool
@@ -43,13 +61,17 @@ func (e errListenerClosed) Timeout() bool { return false }
// listener is closed.
var ErrListenerClosed = errListenerClosed("mux: listener closed")
// for readability of readTimeout
var noTimeout time.Duration
// New instantiates a new connection multiplexer.
func New(l net.Listener) CMux {
return &cMux{
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
readTimeout: noTimeout,
}
}
@@ -60,27 +82,53 @@ type CMux interface {
//
// The order used to call Match determines the priority of matchers.
Match(...Matcher) net.Listener
// MatchWithWriters returns a net.Listener that accepts only the
// connections that matched by at least of the matcher writers.
//
// Prefer Matchers over MatchWriters, since the latter can write on the
// connection before the actual handler.
//
// The order used to call Match determines the priority of matchers.
MatchWithWriters(...MatchWriter) net.Listener
// Serve starts multiplexing the listener. Serve blocks and perhaps
// should be invoked concurrently within a go routine.
Serve() error
// HandleError registers an error handler that handles listener errors.
HandleError(ErrorHandler)
// sets a timeout for the read of matchers
SetReadTimeout(time.Duration)
}
type matchersListener struct {
ss []Matcher
ss []MatchWriter
l muxListener
}
type cMux struct {
root net.Listener
bufLen int
errh ErrorHandler
donec chan struct{}
sls []matchersListener
root net.Listener
bufLen int
errh ErrorHandler
donec chan struct{}
sls []matchersListener
readTimeout time.Duration
}
func matchersToMatchWriters(matchers []Matcher) []MatchWriter {
mws := make([]MatchWriter, 0, len(matchers))
for _, m := range matchers {
mws = append(mws, func(w io.Writer, r io.Reader) bool {
return m(r)
})
}
return mws
}
func (m *cMux) Match(matchers ...Matcher) net.Listener {
mws := matchersToMatchWriters(matchers)
return m.MatchWithWriters(mws...)
}
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
@@ -89,6 +137,10 @@ func (m *cMux) Match(matchers ...Matcher) net.Listener {
return ml
}
func (m *cMux) SetReadTimeout(t time.Duration) {
m.readTimeout = t
}
func (m *cMux) Serve() error {
var wg sync.WaitGroup
@@ -123,11 +175,17 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
muc := newMuxConn(c)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.sniffer())
muc.reset()
matched := s(muc.Conn, muc.startSniffing())
if matched {
muc.doneSniffing()
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Time{})
}
select {
case sl.l.connc <- muc:
case <-donec:
@@ -177,12 +235,13 @@ func (l muxListener) Accept() (net.Conn, error) {
// MuxConn wraps a net.Conn and provides transparent sniffing of connection data.
type MuxConn struct {
net.Conn
buf buffer
buf bufferedReader
}
func newMuxConn(c net.Conn) *MuxConn {
return &MuxConn{
Conn: c,
buf: bufferedReader{source: c},
}
}
@@ -196,22 +255,15 @@ func newMuxConn(c net.Conn) *MuxConn {
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (m *MuxConn) Read(p []byte) (int, error) {
n1, err := m.buf.Read(p)
if err != io.EOF {
return n1, err
}
n2, err := m.Conn.Read(p[n1:])
return n1 + n2, err
return m.buf.Read(p)
}
func (m *MuxConn) sniffer() io.Reader {
return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf))
func (m *MuxConn) startSniffing() io.Reader {
m.buf.reset(true)
return &m.buf
}
func (m *MuxConn) reset() {
m.buf.resetRead()
func (m *MuxConn) doneSniffing() {
m.buf.reset(false)
}

View File

@@ -1,10 +1,26 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/rpc"
@@ -17,6 +33,7 @@ import (
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
@@ -59,14 +76,17 @@ func (l *chanListener) Accept() (net.Conn, error) {
}
func testListener(t *testing.T) (net.Listener, func()) {
l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp4", ":0")
if err != nil {
t.Fatal(err)
}
var once sync.Once
return l, func() {
if err := l.Close(); err != nil {
t.Fatal(err)
}
once.Do(func() {
if err := l.Close(); err != nil {
t.Fatal(err)
}
})
}
}
@@ -167,6 +187,84 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
}
}
const (
handleHTTP1Close = 1
handleHTTP1Request = 2
handleAnyClose = 3
handleAnyRequest = 4
)
func TestTimeout(t *testing.T) {
defer leakCheck(t)()
lis, Close := testListener(t)
defer Close()
result := make(chan int, 5)
testDuration := time.Millisecond * 100
m := New(lis)
m.SetReadTimeout(testDuration)
http1 := m.Match(HTTP1Fast())
any := m.Match(Any())
go func() {
_ = m.Serve()
}()
go func() {
con, err := http1.Accept()
if err != nil {
result <- handleHTTP1Close
} else {
_, _ = con.Write([]byte("http1"))
_ = con.Close()
result <- handleHTTP1Request
}
}()
go func() {
con, err := any.Accept()
if err != nil {
result <- handleAnyClose
} else {
_, _ = con.Write([]byte("any"))
_ = con.Close()
result <- handleAnyRequest
}
}()
time.Sleep(testDuration) // wait to prevent timeouts on slow test-runners
client, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
log.Fatal("testTimeout client failed: ", err)
}
defer func() {
_ = client.Close()
}()
time.Sleep(testDuration / 2)
if len(result) != 0 {
log.Print("tcp ")
t.Fatal("testTimeout failed: accepted to fast: ", len(result))
}
_ = client.SetReadDeadline(time.Now().Add(testDuration * 3))
buffer := make([]byte, 10)
rl, err := client.Read(buffer)
if err != nil {
t.Fatal("testTimeout failed: client error: ", err, rl)
}
Close()
if rl != 3 {
log.Print("testTimeout failed: response from wrong sevice ", rl)
}
if string(buffer[0:3]) != "any" {
log.Print("testTimeout failed: response from wrong sevice ")
}
time.Sleep(testDuration * 2)
if len(result) != 2 {
t.Fatal("testTimeout failed: accepted to less: ", len(result))
}
if a := <-result; a != handleAnyRequest {
t.Fatal("testTimeout failed: any rule did not match")
}
if a := <-result; a != handleHTTP1Close {
t.Fatal("testTimeout failed: no close an http rule")
}
}
func TestRead(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
@@ -284,7 +382,79 @@ func TestHTTP2(t *testing.T) {
t.Fatal(err)
}
var b [len(http2.ClientPreface)]byte
if _, err := muxedConn.Read(b[:]); err != io.EOF {
var n int
// We have the sniffed buffer first...
if n, err = muxedConn.Read(b[:]); err == io.EOF {
t.Fatal(err)
}
// and then we read from the source.
if _, err = muxedConn.Read(b[n:]); err != io.EOF {
t.Fatal(err)
}
if string(b[:]) != http2.ClientPreface {
t.Errorf("got unexpected read %s, expected %s", b, http2.ClientPreface)
}
}
func TestHTTP2MatchHeaderField(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
name := "name"
value := "value"
writer, reader := net.Pipe()
go func() {
if _, err := io.WriteString(writer, http2.ClientPreface); err != nil {
t.Fatal(err)
}
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
if err := enc.WriteField(hpack.HeaderField{Name: name, Value: value}); err != nil {
t.Fatal(err)
}
framer := http2.NewFramer(writer, nil)
err := framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1,
BlockFragment: buf.Bytes(),
EndStream: true,
EndHeaders: true,
})
if err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
}()
l := newChanListener()
l.connCh <- reader
muxl := New(l)
// Register a bogus matcher that only reads one byte.
muxl.Match(func(r io.Reader) bool {
var b [1]byte
_, _ = r.Read(b[:])
return false
})
// Create a matcher that cannot match the response.
muxl.Match(HTTP2HeaderField(name, "another"+value))
// Then match with the expected field.
h2l := muxl.Match(HTTP2HeaderField(name, value))
go safeServe(errCh, muxl)
muxedConn, err := h2l.Accept()
close(l.connCh)
if err != nil {
t.Fatal(err)
}
var b [len(http2.ClientPreface)]byte
// We have the sniffed buffer first...
if _, err := muxedConn.Read(b[:]); err == io.EOF {
t.Fatal(err)
}
if string(b[:]) != http2.ClientPreface {
@@ -419,6 +589,7 @@ func interestingGoroutines() (gs []string) {
}
if stack == "" ||
strings.Contains(stack, "main.main()") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "created by runtime.gc") ||

18
doc.go Normal file
View File

@@ -0,0 +1,18 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package cmux is a library to multiplex network connections based on
// their payload. Using cmux, you can serve different protocols from the
// same listener.
package cmux

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -94,7 +108,16 @@ func HTTP1HeaderField(name, value string) Matcher {
// headers frame.
func HTTP2HeaderField(name, value string) Matcher {
return func(r io.Reader) bool {
return matchHTTP2Field(r, name, value)
return matchHTTP2Field(ioutil.Discard, r, name, value)
}
}
// HTTP2MatchHeaderFieldSendSettings matches the header field and writes the
// settings to the server. Prefer HTTP2HeaderField over this one, if the client
// does not block on receiving a SETTING frame.
func HTTP2MatchHeaderFieldSendSettings(name, value string) MatchWriter {
return func(w io.Writer, r io.Reader) bool {
return matchHTTP2Field(w, r, name, value)
}
}
@@ -116,15 +139,19 @@ func matchHTTP1Field(r io.Reader, name, value string) (matched bool) {
return req.Header.Get(name) == value
}
func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
func matchHTTP2Field(w io.Writer, r io.Reader, name, value string) (matched bool) {
if !hasHTTP2Preface(r) {
return false
}
framer := http2.NewFramer(ioutil.Discard, r)
done := false
framer := http2.NewFramer(w, r)
hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) {
if hf.Name == name && hf.Value == value {
matched = true
if hf.Name == name {
done = true
if hf.Value == value {
matched = true
}
}
})
for {
@@ -134,17 +161,24 @@ func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
}
switch f := f.(type) {
case *http2.SettingsFrame:
if err := framer.WriteSettings(); err != nil {
return false
}
case *http2.ContinuationFrame:
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
return false
}
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
case *http2.HeadersFrame:
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
return false
}
if matched {
return true
}
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
}
if f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 {
return false
}
if done {
return matched
}
}
}

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -8,12 +22,20 @@ import (
// patriciaTree is a simple patricia tree that handles []byte instead of string
// and cannot be changed after instantiation.
type patriciaTree struct {
root *ptNode
root *ptNode
maxDepth int // max depth of the tree.
}
func newPatriciaTree(b ...[]byte) *patriciaTree {
func newPatriciaTree(bs ...[]byte) *patriciaTree {
max := 0
for _, b := range bs {
if max < len(b) {
max = len(b)
}
}
return &patriciaTree{
root: newNode(b),
root: newNode(bs),
maxDepth: max + 1,
}
}
@@ -22,17 +44,19 @@ func newPatriciaTreeString(strs ...string) *patriciaTree {
for i, s := range strs {
b[i] = []byte(s)
}
return &patriciaTree{
root: newNode(b),
}
return newPatriciaTree(b...)
}
func (t *patriciaTree) matchPrefix(r io.Reader) bool {
return t.root.match(r, true)
buf := make([]byte, t.maxDepth)
n, _ := io.ReadFull(r, buf)
return t.root.match(buf[:n], true)
}
func (t *patriciaTree) match(r io.Reader) bool {
return t.root.match(r, false)
buf := make([]byte, t.maxDepth)
n, _ := io.ReadFull(r, buf)
return t.root.match(buf[:n], false)
}
type ptNode struct {
@@ -122,52 +146,34 @@ func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) {
return prefix, rest
}
func readBytes(r io.Reader, n int) (b []byte, err error) {
b = make([]byte, n)
o := 0
for o < n {
nr, err := r.Read(b[o:])
if err != nil && err != io.EOF {
return b, err
func (n *ptNode) match(b []byte, prefix bool) bool {
l := len(n.prefix)
if l > 0 {
if l > len(b) {
l = len(b)
}
o += nr
if err == io.EOF {
break
}
}
return b[:o], nil
}
func (n *ptNode) match(r io.Reader, prefix bool) bool {
if l := len(n.prefix); l > 0 {
b, err := readBytes(r, l)
if err != nil || len(b) != l || !bytes.Equal(b, n.prefix) {
if !bytes.Equal(b[:l], n.prefix) {
return false
}
}
if prefix && n.terminal {
if n.terminal && (prefix || len(n.prefix) == len(b)) {
return true
}
b := make([]byte, 1)
for {
nr, err := r.Read(b)
if nr != 0 {
break
}
if err == io.EOF {
return n.terminal
}
if err != nil {
return false
}
if l >= len(b) {
return false
}
nextN, ok := n.next[b[0]]
return ok && nextN.match(r, prefix)
nextN, ok := n.next[b[l]]
if !ok {
return false
}
if l == len(b) {
b = b[l:l]
} else {
b = b[l+1:]
}
return nextN.match(b, prefix)
}

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -19,6 +33,13 @@ func testPTree(t *testing.T, strs ...string) {
if pt.match(strings.NewReader(s + s)) {
t.Errorf("%s matches %s", s+s, s)
}
// The following tests are just to catch index out of
// range and off-by-one errors and not the functionality.
pt.matchPrefix(strings.NewReader(s[:len(s)-1]))
pt.match(strings.NewReader(s[:len(s)-1]))
pt.matchPrefix(strings.NewReader(s + "$"))
pt.match(strings.NewReader(s + "$"))
}
}
@@ -31,5 +52,5 @@ func TestPatriciaNonOverlapping(t *testing.T) {
}
func TestPatriciaOverlapping(t *testing.T) {
testPTree(t, "foo", "far", "farther", "boo", "bar")
testPTree(t, "foo", "far", "farther", "boo", "ba", "bar")
}