mirror of
https://github.com/soheilhy/cmux.git
synced 2025-09-17 20:20:20 +08:00
Compare commits
5 Commits
v0.1.1
...
dev/fix-me
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f661dcfb59 | ||
![]() |
3ac8d3a667 | ||
![]() |
861c99e0fc | ||
![]() |
13f520d62c | ||
![]() |
e132036cce |
@@ -3,6 +3,7 @@ language: go
|
||||
go:
|
||||
- 1.5
|
||||
- 1.6
|
||||
- 1.7
|
||||
- tip
|
||||
|
||||
matrix:
|
||||
|
@@ -20,6 +20,7 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
@@ -43,6 +44,10 @@ func (c *mockConn) Read(b []byte) (n int, err error) {
|
||||
return c.r.Read(b)
|
||||
}
|
||||
|
||||
func (c *mockConn) SetReadDeadline(time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func discard(l net.Listener) {
|
||||
for {
|
||||
if _, err := l.Accept(); err != nil {
|
||||
|
@@ -42,6 +42,10 @@ func (s *bufferedReader) Read(p []byte) (int, error) {
|
||||
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
|
||||
s.bufferRead += bn
|
||||
return bn, s.lastErr
|
||||
} else if !s.sniffing && s.buffer.Cap() != 0 {
|
||||
// We don't need the buffer anymore.
|
||||
// Reset it to release the internal slice.
|
||||
s.buffer = bytes.Buffer{}
|
||||
}
|
||||
|
||||
// If there is nothing more to return in the sniffed buffer, read from the
|
||||
|
36
cmux.go
36
cmux.go
@@ -19,6 +19,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Matcher matches a connection based on its content.
|
||||
@@ -60,13 +61,17 @@ func (e errListenerClosed) Timeout() bool { return false }
|
||||
// listener is closed.
|
||||
var ErrListenerClosed = errListenerClosed("mux: listener closed")
|
||||
|
||||
// for readability of readTimeout
|
||||
var noTimeout time.Duration
|
||||
|
||||
// New instantiates a new connection multiplexer.
|
||||
func New(l net.Listener) CMux {
|
||||
return &cMux{
|
||||
root: l,
|
||||
bufLen: 1024,
|
||||
errh: func(_ error) bool { return true },
|
||||
donec: make(chan struct{}),
|
||||
root: l,
|
||||
bufLen: 1024,
|
||||
errh: func(_ error) bool { return true },
|
||||
donec: make(chan struct{}),
|
||||
readTimeout: noTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,6 +95,8 @@ type CMux interface {
|
||||
Serve() error
|
||||
// HandleError registers an error handler that handles listener errors.
|
||||
HandleError(ErrorHandler)
|
||||
// sets a timeout for the read of matchers
|
||||
SetReadTimeout(time.Duration)
|
||||
}
|
||||
|
||||
type matchersListener struct {
|
||||
@@ -98,11 +105,12 @@ type matchersListener struct {
|
||||
}
|
||||
|
||||
type cMux struct {
|
||||
root net.Listener
|
||||
bufLen int
|
||||
errh ErrorHandler
|
||||
donec chan struct{}
|
||||
sls []matchersListener
|
||||
root net.Listener
|
||||
bufLen int
|
||||
errh ErrorHandler
|
||||
donec chan struct{}
|
||||
sls []matchersListener
|
||||
readTimeout time.Duration
|
||||
}
|
||||
|
||||
func matchersToMatchWriters(matchers []Matcher) []MatchWriter {
|
||||
@@ -129,6 +137,10 @@ func (m *cMux) MatchWithWriters(matchers ...MatchWriter) net.Listener {
|
||||
return ml
|
||||
}
|
||||
|
||||
func (m *cMux) SetReadTimeout(t time.Duration) {
|
||||
m.readTimeout = t
|
||||
}
|
||||
|
||||
func (m *cMux) Serve() error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -163,11 +175,17 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
||||
muc := newMuxConn(c)
|
||||
if m.readTimeout > noTimeout {
|
||||
_ = c.SetReadDeadline(time.Now().Add(m.readTimeout))
|
||||
}
|
||||
for _, sl := range m.sls {
|
||||
for _, s := range sl.ss {
|
||||
matched := s(muc.Conn, muc.startSniffing())
|
||||
if matched {
|
||||
muc.doneSniffing()
|
||||
if m.readTimeout > noTimeout {
|
||||
_ = c.SetReadDeadline(time.Time{})
|
||||
}
|
||||
select {
|
||||
case sl.l.connc <- muc:
|
||||
case <-donec:
|
||||
|
159
cmux_test.go
159
cmux_test.go
@@ -15,10 +15,12 @@
|
||||
package cmux
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/rpc"
|
||||
@@ -31,6 +33,7 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -73,14 +76,17 @@ func (l *chanListener) Accept() (net.Conn, error) {
|
||||
}
|
||||
|
||||
func testListener(t *testing.T) (net.Listener, func()) {
|
||||
l, err := net.Listen("tcp", ":0")
|
||||
l, err := net.Listen("tcp4", ":0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var once sync.Once
|
||||
return l, func() {
|
||||
if err := l.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
once.Do(func() {
|
||||
if err := l.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,6 +187,84 @@ func runTestRPCClient(t *testing.T, addr net.Addr) {
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
handleHTTP1Close = 1
|
||||
handleHTTP1Request = 2
|
||||
handleAnyClose = 3
|
||||
handleAnyRequest = 4
|
||||
)
|
||||
|
||||
func TestTimeout(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
lis, Close := testListener(t)
|
||||
defer Close()
|
||||
result := make(chan int, 5)
|
||||
testDuration := time.Millisecond * 100
|
||||
m := New(lis)
|
||||
m.SetReadTimeout(testDuration)
|
||||
http1 := m.Match(HTTP1Fast())
|
||||
any := m.Match(Any())
|
||||
go func() {
|
||||
_ = m.Serve()
|
||||
}()
|
||||
go func() {
|
||||
con, err := http1.Accept()
|
||||
if err != nil {
|
||||
result <- handleHTTP1Close
|
||||
} else {
|
||||
_, _ = con.Write([]byte("http1"))
|
||||
_ = con.Close()
|
||||
result <- handleHTTP1Request
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
con, err := any.Accept()
|
||||
if err != nil {
|
||||
result <- handleAnyClose
|
||||
} else {
|
||||
_, _ = con.Write([]byte("any"))
|
||||
_ = con.Close()
|
||||
result <- handleAnyRequest
|
||||
}
|
||||
}()
|
||||
time.Sleep(testDuration) // wait to prevent timeouts on slow test-runners
|
||||
client, err := net.Dial("tcp", lis.Addr().String())
|
||||
if err != nil {
|
||||
log.Fatal("testTimeout client failed: ", err)
|
||||
}
|
||||
defer func() {
|
||||
_ = client.Close()
|
||||
}()
|
||||
time.Sleep(testDuration / 2)
|
||||
if len(result) != 0 {
|
||||
log.Print("tcp ")
|
||||
t.Fatal("testTimeout failed: accepted to fast: ", len(result))
|
||||
}
|
||||
_ = client.SetReadDeadline(time.Now().Add(testDuration * 3))
|
||||
buffer := make([]byte, 10)
|
||||
rl, err := client.Read(buffer)
|
||||
if err != nil {
|
||||
t.Fatal("testTimeout failed: client error: ", err, rl)
|
||||
}
|
||||
Close()
|
||||
if rl != 3 {
|
||||
log.Print("testTimeout failed: response from wrong sevice ", rl)
|
||||
}
|
||||
if string(buffer[0:3]) != "any" {
|
||||
log.Print("testTimeout failed: response from wrong sevice ")
|
||||
}
|
||||
time.Sleep(testDuration * 2)
|
||||
if len(result) != 2 {
|
||||
t.Fatal("testTimeout failed: accepted to less: ", len(result))
|
||||
}
|
||||
if a := <-result; a != handleAnyRequest {
|
||||
t.Fatal("testTimeout failed: any rule did not match")
|
||||
}
|
||||
if a := <-result; a != handleHTTP1Close {
|
||||
t.Fatal("testTimeout failed: no close an http rule")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRead(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
errCh := make(chan error)
|
||||
@@ -312,6 +396,72 @@ func TestHTTP2(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTP2MatchHeaderField(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
errCh := make(chan error)
|
||||
defer func() {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
t.Fatal(err)
|
||||
default:
|
||||
}
|
||||
}()
|
||||
name := "name"
|
||||
value := "value"
|
||||
writer, reader := net.Pipe()
|
||||
go func() {
|
||||
if _, err := io.WriteString(writer, http2.ClientPreface); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
enc := hpack.NewEncoder(&buf)
|
||||
if err := enc.WriteField(hpack.HeaderField{Name: name, Value: value}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
framer := http2.NewFramer(writer, nil)
|
||||
err := framer.WriteHeaders(http2.HeadersFrameParam{
|
||||
StreamID: 1,
|
||||
BlockFragment: buf.Bytes(),
|
||||
EndStream: true,
|
||||
EndHeaders: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := writer.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
l := newChanListener()
|
||||
l.connCh <- reader
|
||||
muxl := New(l)
|
||||
// Register a bogus matcher that only reads one byte.
|
||||
muxl.Match(func(r io.Reader) bool {
|
||||
var b [1]byte
|
||||
_, _ = r.Read(b[:])
|
||||
return false
|
||||
})
|
||||
// Create a matcher that cannot match the response.
|
||||
muxl.Match(HTTP2HeaderField(name, "another"+value))
|
||||
// Then match with the expected field.
|
||||
h2l := muxl.Match(HTTP2HeaderField(name, value))
|
||||
go safeServe(errCh, muxl)
|
||||
muxedConn, err := h2l.Accept()
|
||||
close(l.connCh)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var b [len(http2.ClientPreface)]byte
|
||||
// We have the sniffed buffer first...
|
||||
if _, err := muxedConn.Read(b[:]); err == io.EOF {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(b[:]) != http2.ClientPreface {
|
||||
t.Errorf("got unexpected read %s, expected %s", b, http2.ClientPreface)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPGoRPC(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
errCh := make(chan error)
|
||||
@@ -439,6 +589,7 @@ func interestingGoroutines() (gs []string) {
|
||||
}
|
||||
|
||||
if stack == "" ||
|
||||
strings.Contains(stack, "main.main()") ||
|
||||
strings.Contains(stack, "testing.Main(") ||
|
||||
strings.Contains(stack, "runtime.goexit") ||
|
||||
strings.Contains(stack, "created by runtime.gc") ||
|
||||
|
23
matchers.go
23
matchers.go
@@ -144,10 +144,14 @@ func matchHTTP2Field(w io.Writer, r io.Reader, name, value string) (matched bool
|
||||
return false
|
||||
}
|
||||
|
||||
done := false
|
||||
framer := http2.NewFramer(w, r)
|
||||
hdec := hpack.NewDecoder(uint32(4<<10), func(hf hpack.HeaderField) {
|
||||
if hf.Name == name && hf.Value == value {
|
||||
matched = true
|
||||
if hf.Name == name {
|
||||
done = true
|
||||
if hf.Value == value {
|
||||
matched = true
|
||||
}
|
||||
}
|
||||
})
|
||||
for {
|
||||
@@ -161,17 +165,20 @@ func matchHTTP2Field(w io.Writer, r io.Reader, name, value string) (matched bool
|
||||
if err := framer.WriteSettings(); err != nil {
|
||||
return false
|
||||
}
|
||||
case *http2.ContinuationFrame:
|
||||
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
|
||||
return false
|
||||
}
|
||||
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
|
||||
case *http2.HeadersFrame:
|
||||
if _, err := hdec.Write(f.HeaderBlockFragment()); err != nil {
|
||||
return false
|
||||
}
|
||||
if matched {
|
||||
return true
|
||||
}
|
||||
done = done || f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0
|
||||
}
|
||||
|
||||
if f.FrameHeader.Flags&http2.FlagHeadersEndHeaders != 0 {
|
||||
return false
|
||||
}
|
||||
if done {
|
||||
return matched
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user