Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x/net/http2:Enable HTTP/2 CLIENTs to receive and send unknown frames #80

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion http2/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,9 @@ func (f *Framer) WriteGoAway(maxStreamID uint32, code ErrCode, debugData []byte)
}

// An UnknownFrame is the frame type returned when the frame type is unknown
// or no specific frame type parser exists.
// or no specific frame type parser exists. UnknownFrame will not be used for
// opening or closing a stream even if their flags indicate so. UnknownFrame
// will not be counted against flow control.
type UnknownFrame struct {
FrameHeader
p []byte
Expand Down
206 changes: 206 additions & 0 deletions http2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,111 @@ type Server struct {
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState

// EnableUnknownFrames enables the server to handle unknown frames. We only support unknown frames
// associated with a stream. Connection level unknown frames are not supported. Unknown frames
// should not be used to open or close a stream. In addition, unknown frames should be sent after
// all HEADERS frames in a request. Otherwise, the unknown frames will be ignored. Unknown frames
// can be sent before HEADERS frames in a response.
EnableUnknownFrames bool
}

// contextKey is a value for use with context.WithValue. It's used as
// a pointer so it fits in an interface{} without allocation.
type contextKey struct {
name string
}

// UnknownFrameReaderKey is a context key. It can be used in http.Request.Context() to access the server's
// UnknownFrameReader. The associated value will be of type *UnknownFrameReader.
var UnknownFrameReaderKey = &contextKey{"http2-UnknownFrameReaderKey"}

// unknownFrameTimeout is the timeout used to lock the unknown frame buffer.
var unknownFrameTimeout time.Duration = 5 * time.Second

// UnknownFrameReader is an interface to receive unknown frames sent to the server.
// An instance of UnknownFrameReader is stored in the context of http.Request. An example usage:
// func ServeHTTP(rw http.RequestWriter, r *http.Request) {
// ...
// ufr := r.Context().Value(http2.UnknownFrameReaderKey)
// reader, ok := ufr.(UnknownFrameReader)
// f, err := reader.ReadUnknownFrame(context.Background())
// ...
// }
type UnknownFrameReader interface {
// Call this function to get an unknown frame received by the server.
// If the server hasn't received any new unknown frame, the function will return
// error "haven't received unknown frames". If there will be no more unknown frames, the
// function will return error "no more unknown frame".
ReadUnknownFrame(ctx context.Context) (*UnknownFrame, error)
}

type unknownFramesReceived struct {
mu sync.Mutex
unknownFrames []*UnknownFrame // a buffer stores received unknown frames
noMore bool // If true, indicates there is no more unknown frames on the stream.
}

// Add newly received unknown frame to the buffer.
func (r *unknownFramesReceived) addUnknownFrame(f *UnknownFrame) error {
timer := time.NewTimer(time.Duration(unknownFrameTimeout))
defer timer.Stop()
done := make(chan bool)
go func() {
r.mu.Lock()
r.unknownFrames = append(r.unknownFrames, f)
r.mu.Unlock()
done <- true
return
}()
select {
case <-done:
return nil
case <-timer.C:
return fmt.Errorf("can't add unknown frame to buffer and hit a timeout")
}
}

func (r *unknownFramesReceived) ReadUnknownFrame(ctx context.Context) (*UnknownFrame, error) {
timer := time.NewTimer(time.Duration(unknownFrameTimeout))
defer timer.Stop()

fc := make(chan *UnknownFrame)
empty := make(chan bool)
noMore := make(chan bool)
go func() {
r.mu.Lock()
length := len(r.unknownFrames)
r.mu.Unlock()
if length == 0 {
if r.noMore {
noMore <- true
return
}
empty <- true
return
}

r.mu.Lock()
uf := r.unknownFrames[0]
r.unknownFrames = r.unknownFrames[1:]
fc <- uf
r.mu.Unlock()
}()
for {
select {
case <-ctx.Done():
return &UnknownFrame{}, fmt.Errorf("context canceled")
case <-empty:
return &UnknownFrame{}, fmt.Errorf("haven't received unknown frames")
case <-noMore:
return &UnknownFrame{}, fmt.Errorf("no more unknown frame")
case f := <-fc:
return f, nil
case <-timer.C:
return &UnknownFrame{}, fmt.Errorf("can't grab the lock, time out")
}
}
}

func (s *Server) initialConnRecvWindowSize() int32 {
Expand Down Expand Up @@ -593,6 +698,8 @@ type stream struct {

trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer

unknownFramesReceived *unknownFramesReceived
}

func (sc *serverConn) Framer() *Framer { return sc.framer }
Expand Down Expand Up @@ -1054,6 +1161,41 @@ func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error {
}
}

// writeUnknownFrameFromHandler writes unknown frame response from a handler on
// the given stream.
func (sc *serverConn) writeUnknownFrameFromHandler(stream *stream, t FrameType, flags Flags, payload []byte) error {
ch := make(chan error, 1)
writeArg := &writeUnknownFrame{t, flags, stream.id, payload}
err := sc.writeFrameFromHandler(FrameWriteRequest{
write: writeArg,
stream: stream,
done: ch,
})
if err != nil {
return err
}
select {
case err = <-ch:
return err
case <-sc.doneServing:
return errClientDisconnected
case <-stream.cw:
// If both ch and stream.cw were ready (as might
// happen on the final Write after an http.Handler
// ends), prefer the write result. Otherwise this
// might just be us successfully closing the stream.
// The writeFrameAsync and serve goroutines guarantee
// that the ch send will happen before the stream.cw
// close.
select {
case err = <-ch:
return err
default:
return errStreamClosed
}
}
}

// writeFrame schedules a frame to write and sends it if there's nothing
// already being written.
//
Expand Down Expand Up @@ -1424,6 +1566,8 @@ func (sc *serverConn) processFrame(f Frame) error {
// A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError(ErrCodeProtocol)
case *UnknownFrame:
return sc.processUnknownFrame(f)
default:
sc.vlogf("http2: server ignoring frame: %v", f.Header())
return nil
Expand Down Expand Up @@ -1729,6 +1873,38 @@ func (sc *serverConn) processGoAway(f *GoAwayFrame) error {
return nil
}

func (sc *serverConn) processUnknownFrame(f *UnknownFrame) error {
sc.serveG.check()
if sc.inGoAway && sc.goAwayCode != ErrCodeNo {
return nil
}

if !sc.srv.EnableUnknownFrames {
sc.vlogf("http2: server ignoring frame: %v", f.Header())
return nil
}

// The stream must be in open or half closed state in order to receive unknown frame.
id := f.Header().StreamID
_, st := sc.state(id)
if st == nil || st.gotTrailerHeader || st.resetQueued {
// This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
// done writing).
if st != nil && st.resetQueued {
// Already have a stream error in flight. Don't send another.
return nil
}
return streamError(id, ErrCodeStreamClosed)
}
err := st.unknownFramesReceived.addUnknownFrame(f)
if err != nil {
return err
}
return nil
}

// isPushed reports whether the stream is server-initiated.
func (st *stream) isPushed() bool {
return st.id%2 == 0
Expand All @@ -1748,6 +1924,9 @@ func (st *stream) endStream() {
st.body.CloseWithError(io.EOF)
}
st.state = stateHalfClosedRemote
if sc.srv.EnableUnknownFrames {
st.unknownFramesReceived.noMore = true
}
}

// copyTrailersToHandlerRequest is run in the Handler's goroutine in
Expand Down Expand Up @@ -1878,6 +2057,10 @@ func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error {
sc.conn.SetReadDeadline(time.Time{})
}

if sc.srv.EnableUnknownFrames {
st.unknownFramesReceived = &unknownFramesReceived{unknownFrames: make([]*UnknownFrame, 0, 10), noMore: false}
req = req.WithContext(context.WithValue(req.Context(), UnknownFrameReaderKey, st.unknownFramesReceived))
}
go sc.runHandler(rw, req, handler)
return nil
}
Expand Down Expand Up @@ -2312,6 +2495,20 @@ func (b *requestBody) Read(p []byte) (n int, err error) {
return
}

// WriteUnknownFrame writes unknown frames.
// When EnableUnknownFrames is true, http.ResponseWriter passed to the handler function can be cast to
// WriteUnknownFrame and write unknown frames. For example:
// func ServeHTTP(rw http.RequestWriter, r *http.Request) {
// ...
// ufw, ok := rw.(WriteUnknownFrame)
// ufw.WriteUnknownFrame(frameType, frameFlags, unknownFrameBody)
// ...
// }
type WriteUnknownFrame interface {
// Call WriteUnknownFrame to write an unknown frame.
WriteUnknownFrame(t FrameType, flags Flags, payload []byte) error
}

// responseWriter is the http.ResponseWriter implementation. It's
// intentionally small (1 pointer wide) to minimize garbage. The
// responseWriterState pointer inside is zeroed at the end of a
Expand Down Expand Up @@ -2812,6 +3009,15 @@ func (w *responseWriter) Push(target string, opts *http.PushOptions) error {
}
}

// WriteUnknownFrame writes unknown frames.
func (w *responseWriter) WriteUnknownFrame(t FrameType, flags Flags, payload []byte) error {
if w.rws.stream.sc.srv.EnableUnknownFrames == false {
return errors.New("the server does not enable unknown frames")
}

return w.rws.stream.sc.writeUnknownFrameFromHandler(w.rws.stream, t, flags, payload)
}

type startPushRequest struct {
parent *stream
method string
Expand Down
Loading