2
0
mirror of https://github.com/soheilhy/cmux.git synced 2025-10-17 04:43:12 +08:00

57 Commits

Author SHA1 Message Date
Ryan Fitzpatrick
5ec6847320 Adopt go modules 2021-02-05 14:11:34 -05:00
Abhilash Gnan
cdd3331e3e Add TestClose
Signed-off-by: Abhilash Gnan <abhilashgnan@gmail.com>
2021-01-14 18:06:57 -05:00
Abhilash Gnan
ce11cfdf3a Add mutex around channel close
Signed-off-by: Abhilash Gnan <abhilashgnan@gmail.com>
2021-01-14 18:06:57 -05:00
Abhilash Gnan
ae889c5259 fix blocking select on donec
Signed-off-by: Abhilash Gnan <abhilashgnan@gmail.com>
2021-01-14 18:06:57 -05:00
Abhilash Gnan
3c0ee784ab use param for recieve only chan
Signed-off-by: Abhilash Gnan <abhilashgnan@gmail.com>
2021-01-14 18:06:57 -05:00
Abhilash Gnan
a192073df5 Remove use of mutex around done chan 2021-01-14 18:06:57 -05:00
Abhilash Gnan
e13d1cbf02 add cmux.Close() function 2021-01-14 18:06:57 -05:00
golint fixer
8a8ea3c539 Fix golint import path 2018-10-25 10:41:06 -04:00
Alan D. Cabrera
f7603f4e1c Fix README.md example code 2018-05-19 12:37:04 -04:00
Soheil Hassas Yeganeh
e09e9389d8 Do not write SETTINGS in response to ACKs.
Reported-by talgendler in Issue #42
2018-01-29 10:50:01 -05:00
Yuki Ito
444ce56efe Add test for multiple matchers 2018-01-23 12:52:48 -05:00
Yuki Ito
cfc68f9888 Fix ugly variable name 2018-01-23 12:52:48 -05:00
Yuki Ito
e96bd75f84 Fix bug matchers are ignored except last one 2018-01-23 12:52:48 -05:00
Soheil Hassas Yeganeh
be5b383fd5 Use IPv4 for the listener to avoid v6 failures on Travis. 2017-12-04 12:53:26 -05:00
Soheil Hassas Yeganeh
b9e684ba4e Fix TestClose for Go10.
Depending on the Go version used, reading from a closed pipe can return
net.OpError or io.ErrClosedPipe. Simply check the string content of the
error.
2017-12-04 12:53:26 -05:00
Peter Edge
bb79a83465 Add matchers for header value prefixes 2017-08-14 16:21:37 -04:00
Soheil Hassas Yeganeh
7e08502c7a Merge pull request #51 from peter-edge/increase-test-timeout
Increase test timeout from 100ms to 500ms
2017-08-14 16:05:13 -04:00
Peter Edge
0c129dc694 Increase test timeout from 100ms to 500ms 2017-08-14 15:22:33 -04:00
Soheil Hassas Yeganeh
34a8ab6cda Merge pull request #49 from tmm1/tls-matcher
Add TLS matcher
2017-07-20 14:14:27 -04:00
Aman Gupta
3b204bab2a Add simple test for cmux.TLS() matcher 2017-07-20 10:56:11 -07:00
Aman Gupta
9a3402ad7a unexport prefixByteMatcher 2017-07-19 21:45:02 -07:00
Aman Gupta
4f90533583 add TLS matcher 2017-07-19 20:08:18 -07:00
Soheil Hassas Yeganeh
8cd60510aa Remove V4 address family in cmux_test.go.
On IPv6 only machines, cmux_test.go would fail because it forces
tcp4. Simply use tcp, instead.
2017-07-03 09:35:36 -04:00
Soheil Hassas Yeganeh
f671b41193 Merge pull request #47 from yaojingguo/issue-46
doc: fix a typo
2017-05-22 08:43:13 -04:00
Jingguo Yao
885b8d8a14 doc: fix a typo 2017-05-22 20:38:03 +08:00
Soheil Hassas Yeganeh
0068a46c9c Merge pull request #45 from soheilhy/fix-44
Eliminate blocking reads in the HTTP2 matcher.
2017-04-24 21:57:51 -04:00
Soheil Hassas Yeganeh
6a5d332559 Remove Go 1.5 from travis builds.
gRPC doesn't support Go 1.5 anymore, and the build would
fail if we keep testing with Go 1.5.
2017-04-24 21:45:35 -04:00
Soheil Hassas Yeganeh
c0f3570a02 Eliminate blocking reads in the HTTP2 matcher.
The HTTP2 matcher uses io.ReadFull to read the client preface.
If the client sends a string shorter than the preface (e.g.,
SSL version) io.ReadFull will block.

Replace io.ReadFull with Read and assume partial reads will not
match

Fixes #44
2017-04-23 00:08:19 -04:00
Soheil Hassas Yeganeh
b6ec57c1a4 Merge pull request #43 from soheilhy/dev/go18
Fix tests for Go 1.8+
2017-03-13 10:17:15 -04:00
Soheil Hassas Yeganeh
79b9df6ccf Add Go 1.8 to the travis config. 2017-03-13 09:57:57 -04:00
Soheil Hassas Yeganeh
210139db95 Change connection closed string in tests.
Go 1.8 and 1.9 use different text for the connection closed error.
Use their common prefix (i.e., "use of closed") in the tests for
them to pass on all Go versions.

Go 1.8: "use of closed network connection"
Go 1.9: "use of closed file or network connection"

Suggested-by: Damien Neil <dneil@google.com>
2017-03-13 09:57:57 -04:00
Soheil Hassas Yeganeh
bf4a8ede9e Merge pull request #36 from soheilhy/dev/fix-mem-grow
Fix memory growth
2016-09-25 21:07:37 -04:00
Soheil Hassas Yeganeh
f661dcfb59 Reset the sniffing buffer when not needed
The sniffing buffer will live as long as the connection is open,
and we should reset it as soon as the application has read all the
sniffed data.
2016-09-25 20:56:09 -04:00
Soheil Hassas Yeganeh
526b64db7a update the list of contributors. 2016-09-25 01:03:35 -04:00
Soheil Hassas Yeganeh
3ac8d3a667 Fix lint errors in cmux_test.go 2016-09-25 00:58:27 -04:00
Soheil Hassas Yeganeh
861c99e0fc Return not-match on different field values in HTTP2
Retun as soon as we have the matched field in the HTTP2 matcher
regardless of weather the value is matched or not. Fixes #35.

Issue #35 reports that cmux leaks memory when the client is HTTP2
but does not sends the expected header field. For example, when
the non-gRPC client sends a large field in the header and we are
matching for gRPC, we waste a lot of memory in the sniff buffer.
2016-09-25 00:58:17 -04:00
Soheil Hassas Yeganeh
13f520d62c Merge pull request #34 from ekle/master
SetReadDeadline for Matching
2016-09-05 16:05:55 -04:00
Andreas Jaekle
e132036cce SetReadDeadline for Matching 2016-09-05 19:56:50 +02:00
Soheil Hassas Yeganeh
b26951527b Merge pull request #33 from soheilhy/fix-b32
Fix index out of range in patricia tree
2016-07-15 11:38:34 -07:00
Soheil Hassas Yeganeh
eddb3b1467 Fix index out of range in patricia tree
Bug #32 reported that there is an index out of range error. This
issue was introduced in 703b087.

Fix #32 and add a test to detect this issue
2016-07-15 10:01:37 -07:00
Soheil Hassas Yeganeh
e85da3027e Merge pull request #30 from soheilhy/fix-patricia-tree
Fix race in patricia tree
2016-07-15 08:04:18 -07:00
Tamir Duberstein
df31d48636 Parallelize benchmarks 2016-07-14 22:03:14 -07:00
Soheil Hassas Yeganeh
fd01d3cc6c Fix race in patricia tree
This commit fixes a major issue added in 703b087a.

There are still wins on allocation though.

benchmark                      old ns/op     new ns/op     delta
BenchmarkCMuxConnHTTP1-4       783           836           +6.77%
BenchmarkCMuxConnHTTP2-4       895           806           -9.94%
BenchmarkCMuxConnHTTP1n2-4     1000          1026          +2.60%
BenchmarkCMuxConnHTTP2n1-4     916           961           +4.91%

benchmark                      old allocs     new allocs     delta
BenchmarkCMuxConnHTTP1-4       3              4              +33.33%
BenchmarkCMuxConnHTTP2-4       4              4              +0.00%
BenchmarkCMuxConnHTTP1n2-4     4              5              +25.00%
BenchmarkCMuxConnHTTP2n1-4     4              5              +25.00%

benchmark                      old bytes     new bytes     delta
BenchmarkCMuxConnHTTP1-4       272           280           +2.94%
BenchmarkCMuxConnHTTP2-4       304           304           +0.00%
BenchmarkCMuxConnHTTP1n2-4     304           312           +2.63%
BenchmarkCMuxConnHTTP2n1-4     304           312           +2.63%
2016-07-14 22:01:30 -07:00
Soheil Hassas Yeganeh
f952454ed9 Add the list of contributors
Add the list of contributors in a separate file and add links
to the LICENSE and CONTRIBUTORS files.

Fixes #28
2016-07-09 14:07:27 -04:00
Soheil Hassas Yeganeh
00342b4d79 Add package level docs 2016-07-09 14:01:24 -04:00
Soheil Hassas Yeganeh
cd9b7d74b9 Add copyright notice headers on all Go files.
As per http://www.apache.org/legal/src-headers.html#headers

Fixes #28
2016-07-09 13:56:02 -04:00
Soheil Hassas Yeganeh
9d1e2a64dd Merge pull request #25 from soheilhy/devel-optimize-patricia
Optimize Patricia tree
2016-05-03 22:33:52 -04:00
Soheil Hassas Yeganeh
703b087a39 Optimize Patricia tree
Remove all the extra allocations in the Patricia tree.

O(1) allocation for Patricia and  ~10% improvement for HTTP1 matching.

benchmark                      old ns/op     new ns/op     delta
BenchmarkCMuxConnHTTP1-4       908           782           -13.88%
BenchmarkCMuxConnHTTP2-4       835           818           -2.04%
BenchmarkCMuxConnHTTP1n2-4     1074          1033          -3.82%
BenchmarkCMuxConnHTTP2n1-4     1010          901           -10.79%

benchmark                      old allocs     new allocs     delta
BenchmarkCMuxConnHTTP1-4       5              3              -40.00%
BenchmarkCMuxConnHTTP2-4       4              4              +0.00%
BenchmarkCMuxConnHTTP1n2-4     6              4              -33.33%
BenchmarkCMuxConnHTTP2n1-4     6              4              -33.33%

benchmark                      old bytes     new bytes     delta
BenchmarkCMuxConnHTTP1-4       276           272           -1.45%
BenchmarkCMuxConnHTTP2-4       304           304           +0.00%
BenchmarkCMuxConnHTTP1n2-4     306           304           -0.65%
BenchmarkCMuxConnHTTP2n1-4     308           304           -1.30%
2016-05-03 22:28:32 -04:00
Soheil Hassas Yeganeh
d45bcbe1db Add more benchmarks
Add benchmarks for HTTP2 matchers and combinations of it with HTTP1Fast.
2016-05-03 22:14:35 -04:00
Soheil Hassas Yeganeh
9297b6de56 Merge pull request #24 from soheilhy/fix-java-client
Fix java gRPC client
2016-04-24 14:53:47 -04:00
Soheil Hassas Yeganeh
dc30a14f2d Add docs for the Java gRPC client 2016-04-24 14:52:49 -04:00
Soheil Hassas Yeganeh
d83a667cb2 Add Matchers that can write back on the channel
As reported in issue #22 reports that Java gRPC clients cannot
handshake with cmux'ed gRPC server, since the client does not
immediately send a header with the content-type field. The reason
is that the java client, block on receiving the first SETTING
frame.

Add MatchWriter that can match and write on the connection. Implement
a MatchWriter that writes a SETTING frame once it receives a SETTING
frame.
2016-04-24 14:47:08 -04:00
Soheil Hassas Yeganeh
255149b822 Merge pull request #23 from soheilhy/bytes-buffer
Replace TeeReader with a bytes buffer
2016-04-24 14:38:36 -04:00
Soheil Hassas Yeganeh
3077b24d47 Fix and optimize travis config
There is no point in running shadow and lints for all version of go.

Also, remove 1.3 and 1.4.
2016-04-24 14:10:41 -04:00
Soheil Hassas Yeganeh
d5924ef0b4 Fix a blocking issue in buffer reader
After sniffing and buffering data, if we try to read from
the socket again, bufio.Reader may block. This breaks HTTP handlers
in go1.5.2+ if one tries on browsers or with curl. Go's HTTP client,
however, is not broken. This issue is also there with TeeReader.

Return immediately with the data in the sniffed buffer.
2016-04-24 12:55:13 -04:00
Tamir Duberstein
59b6f01712 Replace buffer with bufferedReader
bufferedReader is an optimized implementation of io.Reader that behaves
like
```
io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
```
without allocating.

This has a measurable effect on benchmarks:
```
name        old time/op    new time/op    delta
CMuxConn-4    1.09µs ± 4%    0.99µs ±19%   -9.32%  (p=0.000 n=17+19)

name        old alloc/op   new alloc/op   delta
CMuxConn-4      240B ± 0%      260B ± 0%   +8.33%  (p=0.000 n=20+20)

name        old allocs/op  new allocs/op  delta
CMuxConn-4      9.00 ± 0%      5.00 ± 0%  -44.44%  (p=0.000 n=20+20)
```

Note that appropriate test coverage is provided by `TestRead`.
2016-02-28 19:41:37 -05:00
Soheil Hassas Yeganeh
7ec7ce7ad1 Merge pull request #21 from soheilhy/devel
Use the readable indentation for error flow
2016-02-27 18:16:04 -05:00
19 changed files with 1053 additions and 299 deletions

View File

@@ -1,22 +1,29 @@
language: go
go:
- 1.3
- 1.4
- 1.5
- 1.6
- 1.7
- 1.8
- tip
matrix:
allow_failures:
- go: tip
gobuild_args: -race
before_install:
- go get -u github.com/golang/lint/golint
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then go get -u github.com/kisielk/errcheck; fi
- go get -u golang.org/x/tools/cmd/vet
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u github.com/kisielk/errcheck; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go get -u golang.org/x/lint/golint; fi
before_script:
- '! gofmt -s -l . | read'
- golint ./...
- echo $TRAVIS_GO_VERSION
- if [[ $TRAVIS_GO_VERSION == 1.5* ]]; then errcheck ./...; fi
- go vet .
- go tool vet --shadow .
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then golint ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then errcheck ./...; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet .; fi
- if [[ $TRAVIS_GO_VERSION == 1.6* ]]; then go tool vet --shadow .; fi
script:
- go test -bench . -v ./...
- go test -race -bench . -v ./...

12
CONTRIBUTORS Normal file
View File

@@ -0,0 +1,12 @@
# The list of people who have contributed code to the cmux repository.
#
# Auto-generated with:
# git log --oneline --pretty=format:'%an <%aE>' | sort -u
#
Andreas Jaekle <andreas@jaekle.net>
Dmitri Shuralyov <shurcooL@gmail.com>
Ethan Mosbaugh <emosbaugh@gmail.com>
Soheil Hassas Yeganeh <soheil.h.y@gmail.com>
Soheil Hassas Yeganeh <soheil@cs.toronto.edu>
Tamir Duberstein <tamir@cockroachlabs.com>
Tamir Duberstein <tamird@gmail.com>

View File

@@ -25,14 +25,14 @@ trpcL := m.Match(cmux.Any()) // Any means anything that is not yet matched.
// Create your protocol servers.
grpcS := grpc.NewServer()
grpchello.RegisterGreeterServer(grpcs, &server{})
grpchello.RegisterGreeterServer(grpcS, &server{})
httpS := &http.Server{
Handler: &helloHTTP1Handler{},
}
trpcS := rpc.NewServer()
s.Register(&ExampleRPCRcvr{})
trpcS.Register(&ExampleRPCRcvr{})
// Use the muxed listeners for your servers.
go grpcS.Serve(grpcL)
@@ -67,3 +67,17 @@ would not be set in your handlers.
when it's accepted. For example, one connection can be either gRPC or REST, but
not both. That is, we assume that a client connection is either used for gRPC
or REST.
* *Java gRPC Clients*: Java gRPC client blocks until it receives a SETTINGS
frame from the server. If you are using the Java client to connect to a cmux'ed
gRPC server please match with writers:
```go
grpcl := m.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
```
# Copyright and License
Copyright 2016 The CMux Authors. All rights reserved.
See [CONTRIBUTORS](https://github.com/soheilhy/cmux/blob/master/CONTRIBUTORS)
for the CMux Authors. Code is released under
[the Apache 2 license](https://github.com/soheilhy/cmux/blob/master/LICENSE).

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -6,8 +20,21 @@ import (
"net"
"sync"
"testing"
"time"
"golang.org/x/net/http2"
)
var (
benchHTTP1Payload = make([]byte, 4096)
benchHTTP2Payload = make([]byte, 4096)
)
func init() {
copy(benchHTTP1Payload, []byte("GET http://www.w3.org/ HTTP/1.1"))
copy(benchHTTP2Payload, http2.ClientPreface)
}
type mockConn struct {
net.Conn
r io.Reader
@@ -17,30 +44,99 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
return c.r.Read(b)
}
func BenchmarkCMuxConn(b *testing.B) {
benchHTTPPayload := make([]byte, 4096)
copy(benchHTTPPayload, []byte("GET http://www.w3.org/ HTTP/1.1"))
func (c *mockConn) SetReadDeadline(time.Time) error {
return nil
}
func discard(l net.Listener) {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}
func BenchmarkCMuxConnHTTP1(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP1Fast())
go func() {
for {
if _, err := l.Accept(); err != nil {
return
}
}
}()
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c := &mockConn{
r: bytes.NewReader(benchHTTPPayload),
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP1Payload),
}, donec, &wg)
}
m.serve(c, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP2(b *testing.B) {
m := New(nil).(*cMux)
l := m.Match(HTTP2())
go discard(l)
donec := make(chan struct{})
var wg sync.WaitGroup
wg.Add(b.N)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP1n2(b *testing.B) {
m := New(nil).(*cMux)
l1 := m.Match(HTTP1Fast())
l2 := m.Match(HTTP2())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP2Payload),
}, donec, &wg)
}
})
}
func BenchmarkCMuxConnHTTP2n1(b *testing.B) {
m := New(nil).(*cMux)
l2 := m.Match(HTTP2())
l1 := m.Match(HTTP1Fast())
go discard(l1)
go discard(l2)
donec := make(chan struct{})
var wg sync.WaitGroup
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
wg.Add(1)
m.serve(&mockConn{
r: bytes.NewReader(benchHTTP1Payload),
}, donec, &wg)
}
})
}

106
buffer.go
View File

@@ -1,57 +1,67 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import "io"
import (
"bytes"
"io"
)
var _ io.ReadWriter = (*buffer)(nil)
type buffer struct {
read int
data []byte
// bufferedReader is an optimized implementation of io.Reader that behaves like
// ```
// io.MultiReader(bytes.NewReader(buffer.Bytes()), io.TeeReader(source, buffer))
// ```
// without allocating.
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
}
// From the io.Reader documentation:
//
// When Read encounters an error or end-of-file condition after
// successfully reading n > 0 bytes, it returns the number of
// bytes read. It may return the (non-nil) error from the same call
// or return the error (and n == 0) from a subsequent call.
// An instance of this general case is that a Reader returning
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (b *buffer) Read(p []byte) (int, error) {
var err error
n := copy(p, b.data[b.read:])
b.read += n
if b.read == len(b.data) {
err = io.EOF
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead {
// If we have already read something from the buffer before, we return the
// same data and the last error if any. We need to immediately return,
// otherwise we may block for ever, if we try to be smart and call
// source.Read() seeking a little bit of more data.
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
} else if !s.sniffing && s.buffer.Cap() != 0 {
// We don't need the buffer anymore.
// Reset it to release the internal slice.
s.buffer = bytes.Buffer{}
}
return n, err
// If there is nothing more to return in the sniffed buffer, read from the
// source.
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing {
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
}
}
return sn, sErr
}
func (b *buffer) Len() int {
return len(b.data) - b.read
}
func (b *buffer) resetRead() {
b.read = 0
}
// From the io.Writer documentation:
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
//
// In a previous incarnation, this implementation retained the incoming slice.
func (b *buffer) Write(p []byte) (int, error) {
b.data = append(b.data, p...)
return len(p), nil
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0
s.bufferSize = s.buffer.Len()
}

View File

@@ -1,113 +0,0 @@
package cmux
import (
"bytes"
"io"
"testing"
)
func TestWriteNoModify(t *testing.T) {
var b buffer
const origWriteByte = 0
const postWriteByte = 1
writeBytes := []byte{origWriteByte}
if _, err := b.Write(writeBytes); err != nil {
t.Fatal(err)
}
writeBytes[0] = postWriteByte
readBytes := make([]byte, 1)
if _, err := b.Read(readBytes); err != io.EOF {
t.Fatal(err)
}
if readBytes[0] != origWriteByte {
t.Fatalf("expected to read %x, but read %x; buffer retained passed-in slice", origWriteByte, postWriteByte)
}
}
const writeString = "deadbeef"
func TestBuffer(t *testing.T) {
writeBytes := []byte(writeString)
const numWrites = 10
var b buffer
for i := 0; i < numWrites; i++ {
n, err := b.Write(writeBytes)
if err != nil && err != io.EOF {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
}
for j := 0; j < 2; j++ {
readBytes := make([]byte, len(writeBytes))
for i := 0; i < numWrites; i++ {
n, err := b.Read(readBytes)
if i == numWrites-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != len(readBytes) {
t.Fatalf("cannot read all the bytes: want=%d got=%d", len(readBytes), n)
}
if !bytes.Equal(writeBytes, readBytes) {
t.Errorf("different bytes read: want=%d got=%d", writeBytes, readBytes)
}
}
n, err := b.Read(readBytes)
if err != io.EOF {
t.Errorf("expected EOF")
}
if n != 0 {
t.Errorf("expected buffer to be empty, but got %d bytes", n)
}
b.resetRead()
}
}
func TestBufferOffset(t *testing.T) {
writeBytes := []byte(writeString)
var b buffer
n, err := b.Write(writeBytes)
if err != nil {
t.Fatal(err)
}
if n != len(writeBytes) {
t.Fatalf("cannot write all the bytes: want=%d got=%d", len(writeBytes), n)
}
const readSize = 2
numReads := len(writeBytes) / readSize
for i := 0; i < numReads; i++ {
readBytes := make([]byte, readSize)
n, err := b.Read(readBytes)
if i == numReads-1 {
// The last read should report EOF.
if err != io.EOF {
t.Fatal(err)
}
} else if err != nil {
t.Fatal(err)
}
if n != readSize {
t.Fatalf("cannot read the bytes: want=%d got=%d", readSize, n)
}
if got := writeBytes[i*readSize : i*readSize+readSize]; !bytes.Equal(got, readBytes) {
t.Fatalf("different bytes read: want=%s got=%s", readBytes, got)
}
}
}

152
cmux.go
View File

@@ -1,15 +1,34 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
)
// Matcher matches a connection based on its content.
type Matcher func(io.Reader) bool
// MatchWriter is a match that can also write response (say to do handshake).
type MatchWriter func(io.Writer, io.Reader) bool
// ErrorHandler handles an error and returns whether
// the mux should continue serving the listener.
type ErrorHandler func(error) bool
@@ -43,13 +62,20 @@ func (e errListenerClosed) Timeout() bool { return false }
// listener is closed.
var ErrListenerClosed = errListenerClosed("mux: listener closed")
// ErrServerClosed is returned from muxListener.Accept when mux server is closed.
var ErrServerClosed = errors.New("mux: server closed")
// for readability of readTimeout
var noTimeout time.Duration
// New instantiates a new connection multiplexer.
func New(l net.Listener) CMux {
return &cMux{
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
root: l,
bufLen: 1024,
errh: func(_ error) bool { return true },
donec: make(chan struct{}),
readTimeout: noTimeout,
}
}
@@ -60,40 +86,75 @@ type CMux interface {
//
// The order used to call Match determines the priority of matchers.
Match(...Matcher) net.Listener
// MatchWithWriters returns a net.Listener that accepts only the
// connections that matched by at least of the matcher writers.
//
// Prefer Matchers over MatchWriters, since the latter can write on the
// connection before the actual handler.
//
// The order used to call Match determines the priority of matchers.
MatchWithWriters(...MatchWriter) net.Listener
// Serve starts multiplexing the listener. Serve blocks and perhaps
// should be invoked concurrently within a go routine.
Serve() error
// Closes cmux server and stops accepting any connections on listener
Close()
// HandleError registers an error handler that handles listener errors.
HandleError(ErrorHandler)
// sets a timeout for the read of matchers
SetReadTimeout(time.Duration)
}
type matchersListener struct {
ss []Matcher
ss []MatchWriter
l muxListener
}
type cMux struct {
root net.Listener
bufLen int
errh ErrorHandler
donec chan struct{}
sls []matchersListener
root net.Listener
bufLen int
errh ErrorHandler
sls []matchersListener
readTimeout time.Duration
donec chan struct{}
mu sync.Mutex
}
func matchersToMatchWriters(matchers []Matcher) []MatchWriter {
mws := make([]MatchWriter, 0, len(matchers))
for _, m := range matchers {
cm := m
mws = append(mws, func(w io.Writer, r io.Reader) bool {
return cm(r)
})
}
return mws
}
func (m *cMux) Match(matchers ...Matcher) net.Listener {
mws := matchersToMatchWriters(matchers)
return m.MatchWithWriters(mws...)
}
func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
ml := muxListener{
Listener: m.root,
connc: make(chan net.Conn, m.bufLen),
donec: make(chan struct{}),
}
m.sls = append(m.sls, matchersListener{ss: matchers, l: ml})
return ml
}
func (m *cMux) SetReadTimeout(t time.Duration) {
m.readTimeout = t
}
func (m *cMux) Serve() error {
var wg sync.WaitGroup
defer func() {
close(m.donec)
m.closeDoneChans()
wg.Wait()
for _, sl := range m.sls {
@@ -123,11 +184,17 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
muc := newMuxConn(c)
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
}
for _, sl := range m.sls {
for _, s := range sl.ss {
matched := s(muc.sniffer())
muc.reset()
matched := s(muc.Conn, muc.startSniffing())
if matched {
muc.doneSniffing()
if m.readTimeout > noTimeout {
_ = c.SetReadDeadline(time.Time{})
}
select {
case sl.l.connc <- muc:
case <-donec:
@@ -145,6 +212,30 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
}
}
func (m *cMux) Close() {
m.closeDoneChans()
}
func (m *cMux) closeDoneChans() {
m.mu.Lock()
defer m.mu.Unlock()
select {
case <-m.donec:
// Already closed. Don't close again
default:
close(m.donec)
}
for _, sl := range m.sls {
select {
case <-sl.l.donec:
// Already closed. Don't close again
default:
close(sl.l.donec)
}
}
}
func (m *cMux) HandleError(h ErrorHandler) {
m.errh = h
}
@@ -164,25 +255,31 @@ func (m *cMux) handleErr(err error) bool {
type muxListener struct {
net.Listener
connc chan net.Conn
donec chan struct{}
}
func (l muxListener) Accept() (net.Conn, error) {
c, ok := <-l.connc
if !ok {
return nil, ErrListenerClosed
select {
case c, ok := <-l.connc:
if !ok {
return nil, ErrListenerClosed
}
return c, nil
case <-l.donec:
return nil, ErrServerClosed
}
return c, nil
}
// MuxConn wraps a net.Conn and provides transparent sniffing of connection data.
type MuxConn struct {
net.Conn
buf buffer
buf bufferedReader
}
func newMuxConn(c net.Conn) *MuxConn {
return &MuxConn{
Conn: c,
buf: bufferedReader{source: c},
}
}
@@ -196,22 +293,15 @@ func newMuxConn(c net.Conn) *MuxConn {
// a non-zero number of bytes at the end of the input stream may
// return either err == EOF or err == nil. The next Read should
// return 0, EOF.
//
// This function implements the latter behaviour, returning the
// (non-nil) error from the same call.
func (m *MuxConn) Read(p []byte) (int, error) {
n1, err := m.buf.Read(p)
if err != io.EOF {
return n1, err
}
n2, err := m.Conn.Read(p[n1:])
return n1 + n2, err
return m.buf.Read(p)
}
func (m *MuxConn) sniffer() io.Reader {
return io.MultiReader(&m.buf, io.TeeReader(m.Conn, &m.buf))
func (m *MuxConn) startSniffing() io.Reader {
m.buf.reset(true)
return &m.buf
}
func (m *MuxConn) reset() {
m.buf.resetRead()
func (m *MuxConn) doneSniffing() {
m.buf.reset(false)
}

View File

@@ -1,13 +1,34 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
"bytes"
"crypto/rand"
"crypto/tls"
"errors"
"fmt"
"go/build"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/rpc"
"os"
"os/exec"
"runtime"
"sort"
"strings"
@@ -17,6 +38,7 @@ import (
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
@@ -25,7 +47,7 @@ const (
)
func safeServe(errCh chan<- error, muxl CMux) {
if err := muxl.Serve(); !strings.Contains(err.Error(), "use of closed network connection") {
if err := muxl.Serve(); !strings.Contains(err.Error(), "use of closed") {
errCh <- err
}
}
@@ -59,14 +81,17 @@ func (l *chanListener) Accept() (net.Conn, error) {
}
func testListener(t *testing.T) (net.Listener, func()) {
l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
var once sync.Once
return l, func() {
if err := l.Close(); err != nil {
t.Fatal(err)
}
once.Do(func() {
if err := l.Close(); err != nil {
t.Fatal(err)
}
})
}
}
@@ -103,13 +128,62 @@ func runTestHTTPServer(errCh chan<- error, l net.Listener) {
mu.Unlock()
},
}
if err := s.Serve(l); err != ErrListenerClosed {
if err := s.Serve(l); err != ErrListenerClosed && err != ErrServerClosed {
errCh <- err
}
}
func generateTLSCert(t *testing.T) {
err := exec.Command("go", "run", build.Default.GOROOT+"/src/crypto/tls/generate_cert.go", "--host", "*").Run()
if err != nil {
t.Fatal(err)
}
}
func cleanupTLSCert(t *testing.T) {
err := os.Remove("cert.pem")
if err != nil {
t.Error(err)
}
err = os.Remove("key.pem")
if err != nil {
t.Error(err)
}
}
func runTestTLSServer(errCh chan<- error, l net.Listener) {
certificate, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
if err != nil {
errCh <- err
log.Printf("1")
return
}
config := &tls.Config{
Certificates: []tls.Certificate{certificate},
Rand: rand.Reader,
}
tlsl := tls.NewListener(l, config)
runTestHTTPServer(errCh, tlsl)
}
func runTestHTTP1Client(t *testing.T, addr net.Addr) {
r, err := http.Get("http://" + addr.String())
runTestHTTPClient(t, "http", addr)
}
func runTestTLSClient(t *testing.T, addr net.Addr) {
runTestHTTPClient(t, "https", addr)
}
func runTestHTTPClient(t *testing.T, proto string, addr net.Addr) {
client := http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}
r, err := client.Get(proto + "://" + addr.String())
if err != nil {
t.Fatal(err)
}
@@ -144,7 +218,7 @@ func runTestRPCServer(errCh chan<- error, l net.Listener) {
for {
c, err := l.Accept()
if err != nil {
if err != ErrListenerClosed {
if err != ErrListenerClosed && err != ErrServerClosed {
errCh <- err
}
return
@@ -167,6 +241,84 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
}
}
const (
handleHTTP1Close = 1
handleHTTP1Request = 2
handleAnyClose = 3
handleAnyRequest = 4
)
func TestTimeout(t *testing.T) {
defer leakCheck(t)()
lis, Close := testListener(t)
defer Close()
result := make(chan int, 5)
testDuration := time.Millisecond * 500
m := New(lis)
m.SetReadTimeout(testDuration)
http1 := m.Match(HTTP1Fast())
any := m.Match(Any())
go func() {
_ = m.Serve()
}()
go func() {
con, err := http1.Accept()
if err != nil {
result <- handleHTTP1Close
} else {
_, _ = con.Write([]byte("http1"))
_ = con.Close()
result <- handleHTTP1Request
}
}()
go func() {
con, err := any.Accept()
if err != nil {
result <- handleAnyClose
} else {
_, _ = con.Write([]byte("any"))
_ = con.Close()
result <- handleAnyRequest
}
}()
time.Sleep(testDuration) // wait to prevent timeouts on slow test-runners
client, err := net.Dial("tcp", lis.Addr().String())
if err != nil {
log.Fatal("testTimeout client failed: ", err)
}
defer func() {
_ = client.Close()
}()
time.Sleep(testDuration / 2)
if len(result) != 0 {
log.Print("tcp ")
t.Fatal("testTimeout failed: accepted to fast: ", len(result))
}
_ = client.SetReadDeadline(time.Now().Add(testDuration * 3))
buffer := make([]byte, 10)
rl, err := client.Read(buffer)
if err != nil {
t.Fatal("testTimeout failed: client error: ", err, rl)
}
Close()
if rl != 3 {
log.Print("testTimeout failed: response from wrong sevice ", rl)
}
if string(buffer[0:3]) != "any" {
log.Print("testTimeout failed: response from wrong sevice ")
}
time.Sleep(testDuration * 2)
if len(result) != 2 {
t.Fatal("testTimeout failed: accepted to less: ", len(result))
}
if a := <-result; a != handleAnyRequest {
t.Fatal("testTimeout failed: any rule did not match")
}
if a := <-result; a != handleHTTP1Close {
t.Fatal("testTimeout failed: no close an http rule")
}
}
func TestRead(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
@@ -247,6 +399,33 @@ func TestAny(t *testing.T) {
runTestHTTP1Client(t, l.Addr())
}
func TestTLS(t *testing.T) {
generateTLSCert(t)
defer cleanupTLSCert(t)
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
l, cleanup := testListener(t)
defer cleanup()
muxl := New(l)
tlsl := muxl.Match(TLS())
httpl := muxl.Match(Any())
go runTestTLSServer(errCh, tlsl)
go runTestHTTPServer(errCh, httpl)
go safeServe(errCh, muxl)
runTestHTTP1Client(t, l.Addr())
runTestTLSClient(t, l.Addr())
}
func TestHTTP2(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
@@ -284,7 +463,92 @@ func TestHTTP2(t *testing.T) {
t.Fatal(err)
}
var b [len(http2.ClientPreface)]byte
if _, err := muxedConn.Read(b[:]); err != io.EOF {
var n int
// We have the sniffed buffer first...
if n, err = muxedConn.Read(b[:]); err == io.EOF {
t.Fatal(err)
}
// and then we read from the source.
if _, err = muxedConn.Read(b[n:]); err != io.EOF {
t.Fatal(err)
}
if string(b[:]) != http2.ClientPreface {
t.Errorf("got unexpected read %s, expected %s", b, http2.ClientPreface)
}
}
func TestHTTP2MatchHeaderField(t *testing.T) {
testHTTP2MatchHeaderField(t, HTTP2HeaderField, "value", "value", "anothervalue")
}
func TestHTTP2MatchHeaderFieldPrefix(t *testing.T) {
testHTTP2MatchHeaderField(t, HTTP2HeaderFieldPrefix, "application/grpc+proto", "application/grpc", "application/json")
}
func testHTTP2MatchHeaderField(
t *testing.T,
matcherConstructor func(string, string) Matcher,
headerValue string,
matchValue string,
notMatchValue string,
) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
name := "name"
writer, reader := net.Pipe()
go func() {
if _, err := io.WriteString(writer, http2.ClientPreface); err != nil {
t.Fatal(err)
}
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
if err := enc.WriteField(hpack.HeaderField{Name: name, Value: headerValue}); err != nil {
t.Fatal(err)
}
framer := http2.NewFramer(writer, nil)
err := framer.WriteHeaders(http2.HeadersFrameParam{
StreamID: 1,
BlockFragment: buf.Bytes(),
EndStream: true,
EndHeaders: true,
})
if err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
}()
l := newChanListener()
l.connCh <- reader
muxl := New(l)
// Register a bogus matcher that only reads one byte.
muxl.Match(func(r io.Reader) bool {
var b [1]byte
_, _ = r.Read(b[:])
return false
})
// Create a matcher that cannot match the response.
muxl.Match(matcherConstructor(name, notMatchValue))
// Then match with the expected field.
h2l := muxl.Match(matcherConstructor(name, matchValue))
go safeServe(errCh, muxl)
muxedConn, err := h2l.Accept()
close(l.connCh)
if err != nil {
t.Fatal(err)
}
var b [len(http2.ClientPreface)]byte
// We have the sniffed buffer first...
if _, err := muxedConn.Read(b[:]); err == io.EOF {
t.Fatal(err)
}
if string(b[:]) != http2.ClientPreface {
@@ -358,7 +622,36 @@ func TestErrorHandler(t *testing.T) {
}
}
func TestClose(t *testing.T) {
func TestMultipleMatchers(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
l, cleanup := testListener(t)
defer cleanup()
matcher := func(r io.Reader) bool {
return true
}
unmatcher := func(r io.Reader) bool {
return false
}
muxl := New(l)
lis := muxl.Match(unmatcher, matcher, unmatcher)
go runTestHTTPServer(errCh, lis)
go safeServe(errCh, muxl)
runTestHTTP1Client(t, l.Addr())
}
func TestListenerClose(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
@@ -392,15 +685,42 @@ func TestClose(t *testing.T) {
// Second connection either goes through or it is closed.
if _, err := anyl.Accept(); err != nil {
if err != ErrListenerClosed {
if err != ErrListenerClosed && err != ErrServerClosed {
t.Fatal(err)
}
if _, err := c2.Read([]byte{}); err != io.ErrClosedPipe {
// The error is either io.ErrClosedPipe or net.OpError wrapping
// a net.pipeError depending on the go version.
if _, err := c2.Read([]byte{}); !strings.Contains(err.Error(), "closed") {
t.Fatalf("connection is not closed and is leaked: %v", err)
}
}
}
func TestClose(t *testing.T) {
defer leakCheck(t)()
errCh := make(chan error)
defer func() {
select {
case err := <-errCh:
t.Fatal(err)
default:
}
}()
l, cleanup := testListener(t)
defer cleanup()
muxl := New(l)
anyl := muxl.Match(Any())
go safeServe(errCh, muxl)
muxl.Close()
if _, err := anyl.Accept(); err != ErrServerClosed {
t.Fatal(err)
}
}
// Cribbed from google.golang.org/grpc/test/end2end_test.go.
// interestingGoroutines returns all goroutines we care about for the purpose
@@ -419,6 +739,7 @@ func interestingGoroutines() (gs []string) {
}
if stack == "" ||
strings.Contains(stack, "main.main()") ||
strings.Contains(stack, "testing.Main(") ||
strings.Contains(stack, "runtime.goexit") ||
strings.Contains(stack, "created by runtime.gc") ||

18
doc.go Normal file
View File

@@ -0,0 +1,18 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
// Package cmux is a library to multiplex network connections based on
// their payload. Using cmux, you can serve different protocols from the
// same listener.
package cmux

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (
@@ -15,6 +29,7 @@ import (
"golang.org/x/net/websocket"
"github.com/soheilhy/cmux"
"google.golang.org/grpc/examples/helloworld/helloworld"
grpchello "google.golang.org/grpc/examples/helloworld/helloworld"
)
@@ -72,7 +87,9 @@ func serveRPC(l net.Listener) {
}
}
type grpcServer struct{}
type grpcServer struct {
helloworld.UnimplementedGreeterServer
}
func (s *grpcServer) SayHello(ctx context.Context, in *grpchello.HelloRequest) (
*grpchello.HelloReply, error) {
@@ -98,7 +115,7 @@ func Example() {
// We first match the connection against HTTP2 fields. If matched, the
// connection will be sent through the "grpcl" listener.
grpcl := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
grpcl := m.Match(cmux.HTTP2HeaderFieldPrefix("content-type", "application/grpc"))
//Otherwise, we match it againts a websocket upgrade request.
wsl := m.Match(cmux.HTTP1HeaderField("Upgrade", "websocket"))

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux_test
import (

13
example/go.mod Normal file
View File

@@ -0,0 +1,13 @@
module github.com/soheilhy/cmux/example
go 1.11
require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/soheilhy/cmux v0.0.0-00010101000000-000000000000
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb
google.golang.org/genproto v0.0.0-20201207150747-9ee31aac76e7 // indirect
google.golang.org/grpc v1.27.0
)
replace github.com/soheilhy/cmux => ../

80
example/go.sum Normal file
View File

@@ -0,0 +1,80 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20201207150747-9ee31aac76e7 h1:MrlntRhz7JNWmR2J5pRYZFgfR0IuuhELDhxo2aBZVsg=
google.golang.org/genproto v0.0.0-20201207150747-9ee31aac76e7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0 h1:UhZDfRO8JRQru4/+LlLE0BRKGF8L+PICnvYZmx/fEGA=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

5
go.mod Normal file
View File

@@ -0,0 +1,5 @@
module github.com/soheilhy/cmux
go 1.11
require golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb

12
go.sum Normal file
View File

@@ -0,0 +1,12 @@
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@@ -1,7 +1,22 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
"bufio"
"crypto/tls"
"io"
"io/ioutil"
"net/http"
@@ -23,6 +38,11 @@ func PrefixMatcher(strs ...string) Matcher {
return pt.matchPrefix
}
func prefixByteMatcher(list ...[]byte) Matcher {
pt := newPatriciaTree(list...)
return pt.matchPrefix
}
var defaultHTTPMethods = []string{
"OPTIONS",
"GET",
@@ -43,6 +63,27 @@ func HTTP1Fast(extMethods ...string) Matcher {
return PrefixMatcher(append(defaultHTTPMethods, extMethods...)...)
}
// TLS matches HTTPS requests.
//
// By default, any TLS handshake packet is matched. An optional whitelist
// of versions can be passed in to restrict the matcher, for example:
// TLS(tls.VersionTLS11, tls.VersionTLS12)
func TLS(versions ...int) Matcher {
if len(versions) == 0 {
versions = []int{
tls.VersionSSL30,
tls.VersionTLS10,
tls.VersionTLS11,
tls.VersionTLS12,
}
}
prefixes := [][]byte{}
for _, v := range versions {
prefixes = append(prefixes, []byte{22, byte(v >> 8 & 0xff), byte(v & 0xff)})
}
return prefixByteMatcher(prefixes...)
}
const maxHTTPRead = 4096
// HTTP1 parses the first line or upto 4096 bytes of the request to see if
@@ -86,45 +127,109 @@ func HTTP2() Matcher {
// request of an HTTP 1 connection.
func HTTP1HeaderField(name, value string) Matcher {
return func(r io.Reader) bool {
return matchHTTP1Field(r, name, value)
return matchHTTP1Field(r, name, func(gotValue string) bool {
return gotValue == value
})
}
}
// HTTP2HeaderField resturns a matcher matching the header fields of the first
// HTTP1HeaderFieldPrefix returns a matcher matching the header fields of the
// first request of an HTTP 1 connection. If the header with key name has a
// value prefixed with valuePrefix, this will match.
func HTTP1HeaderFieldPrefix(name, valuePrefix string) Matcher {
return func(r io.Reader) bool {
return matchHTTP1Field(r, name, func(gotValue string) bool {
return strings.HasPrefix(gotValue, valuePrefix)
})
}
}
// HTTP2HeaderField returns a matcher matching the header fields of the first
// headers frame.
func HTTP2HeaderField(name, value string) Matcher {
return func(r io.Reader) bool {
return matchHTTP2Field(r, name, value)
return matchHTTP2Field(ioutil.Discard, r, name, func(gotValue string) bool {
return gotValue == value
})
}
}
// HTTP2HeaderFieldPrefix returns a matcher matching the header fields of the
// first headers frame. If the header with key name has a value prefixed with
// valuePrefix, this will match.
func HTTP2HeaderFieldPrefix(name, valuePrefix string) Matcher {
return func(r io.Reader) bool {
return matchHTTP2Field(ioutil.Discard, r, name, func(gotValue string) bool {
return strings.HasPrefix(gotValue, valuePrefix)
})
}
}
// HTTP2MatchHeaderFieldSendSettings matches the header field and writes the
// settings to the server. Prefer HTTP2HeaderField over this one, if the client
// does not block on receiving a SETTING frame.
func HTTP2MatchHeaderFieldSendSettings(name, value string) MatchWriter {
return func(w io.Writer, r io.Reader) bool {
return matchHTTP2Field(w, r, name, func(gotValue string) bool {
return gotValue == value
})
}
}
// HTTP2MatchHeaderFieldPrefixSendSettings matches the header field prefix
// and writes the settings to the server. Prefer HTTP2HeaderFieldPrefix over
// this one, if the client does not block on receiving a SETTING frame.
func HTTP2MatchHeaderFieldPrefixSendSettings(name, valuePrefix string) MatchWriter {
return func(w io.Writer, r io.Reader) bool {
return matchHTTP2Field(w, r, name, func(gotValue string) bool {
return strings.HasPrefix(gotValue, valuePrefix)
})
}
}
func hasHTTP2Preface(r io.Reader) bool {
var b [len(http2.ClientPreface)]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return false
}
last := 0
return string(b[:]) == http2.ClientPreface
for {
n, err := r.Read(b[last:])
if err != nil {
return false
}
last += n
eq := string(b[:last]) == http2.ClientPreface[:last]
if last == len(http2.ClientPreface) {
return eq
}
if !eq {
return false
}
}
}
func matchHTTP1Field(r io.Reader, name, value string) (matched bool) {
func matchHTTP1Field(r io.Reader, name string, matches func(string) bool) (matched bool) {
req, err := http.ReadRequest(bufio.NewReader(r))
if err != nil {
return false
}
return req.Header.Get(name) == value
return matches(req.Header.Get(name))
}
func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
func matchHTTP2Field(w io.Writer, r io.Reader, name string, matches func(string) bool) (matched bool) {
if !hasHTTP2Preface(r) {
return false
}
framer := http2.NewFramer(ioutil.Discard, r)
done := false
framer := http2.NewFramer(w, r)
hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) {
if hf.Name == name && hf.Value == value {
matched = true
if hf.Name == name {
done = true
if matches(hf.Value) {
matched = true
}
}
})
for {
@@ -134,17 +239,29 @@ func matchHTTP2Field(r io.Reader, name, value string) (matched bool) {
}
switch f := f.(type) {
case *http2.SettingsFrame:
// Sender acknoweldged the SETTINGS frame. No need to write
// SETTINGS again.
if f.IsAck() {
break
}
if err := framer.WriteSettings(); err != nil {
return false
}
case *http2.ContinuationFrame:
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
return false
}
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
case *http2.HeadersFrame:
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
return false
}
if matched {
return true
}
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
}
if f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 {
return false
}
if done {
return matched
}
}
}

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -8,12 +22,20 @@ import (
// patriciaTree is a simple patricia tree that handles []byte instead of string
// and cannot be changed after instantiation.
type patriciaTree struct {
root *ptNode
root *ptNode
maxDepth int // max depth of the tree.
}
func newPatriciaTree(b ...[]byte) *patriciaTree {
func newPatriciaTree(bs ...[]byte) *patriciaTree {
max := 0
for _, b := range bs {
if max < len(b) {
max = len(b)
}
}
return &patriciaTree{
root: newNode(b),
root: newNode(bs),
maxDepth: max + 1,
}
}
@@ -22,17 +44,19 @@ func newPatriciaTreeString(strs ...string) *patriciaTree {
for i, s := range strs {
b[i] = []byte(s)
}
return &patriciaTree{
root: newNode(b),
}
return newPatriciaTree(b...)
}
func (t *patriciaTree) matchPrefix(r io.Reader) bool {
return t.root.match(r, true)
buf := make([]byte, t.maxDepth)
n, _ := io.ReadFull(r, buf)
return t.root.match(buf[:n], true)
}
func (t *patriciaTree) match(r io.Reader) bool {
return t.root.match(r, false)
buf := make([]byte, t.maxDepth)
n, _ := io.ReadFull(r, buf)
return t.root.match(buf[:n], false)
}
type ptNode struct {
@@ -122,52 +146,34 @@ func splitPrefix(bss [][]byte) (prefix []byte, rest [][]byte) {
return prefix, rest
}
func readBytes(r io.Reader, n int) (b []byte, err error) {
b = make([]byte, n)
o := 0
for o < n {
nr, err := r.Read(b[o:])
if err != nil && err != io.EOF {
return b, err
func (n *ptNode) match(b []byte, prefix bool) bool {
l := len(n.prefix)
if l > 0 {
if l > len(b) {
l = len(b)
}
o += nr
if err == io.EOF {
break
}
}
return b[:o], nil
}
func (n *ptNode) match(r io.Reader, prefix bool) bool {
if l := len(n.prefix); l > 0 {
b, err := readBytes(r, l)
if err != nil || len(b) != l || !bytes.Equal(b, n.prefix) {
if !bytes.Equal(b[:l], n.prefix) {
return false
}
}
if prefix && n.terminal {
if n.terminal && (prefix || len(n.prefix) == len(b)) {
return true
}
b := make([]byte, 1)
for {
nr, err := r.Read(b)
if nr != 0 {
break
}
if err == io.EOF {
return n.terminal
}
if err != nil {
return false
}
if l >= len(b) {
return false
}
nextN, ok := n.next[b[0]]
return ok && nextN.match(r, prefix)
nextN, ok := n.next[b[l]]
if !ok {
return false
}
if l == len(b) {
b = b[l:l]
} else {
b = b[l+1:]
}
return nextN.match(b, prefix)
}

View File

@@ -1,3 +1,17 @@
// Copyright 2016 The CMux Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package cmux
import (
@@ -19,6 +33,13 @@ func testPTree(t *testing.T, strs ...string) {
if pt.match(strings.NewReader(s + s)) {
t.Errorf("%s matches %s", s+s, s)
}
// The following tests are just to catch index out of
// range and off-by-one errors and not the functionality.
pt.matchPrefix(strings.NewReader(s[:len(s)-1]))
pt.match(strings.NewReader(s[:len(s)-1]))
pt.matchPrefix(strings.NewReader(s + "$"))
pt.match(strings.NewReader(s + "$"))
}
}
@@ -31,5 +52,5 @@ func TestPatriciaNonOverlapping(t *testing.T) {
}
func TestPatriciaOverlapping(t *testing.T) {
testPTree(t, "foo", "far", "farther", "boo", "bar")
testPTree(t, "foo", "far", "farther", "boo", "ba", "bar")
}