Damien Neil has uploaded this change for review.
http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 142 insertions(+), 74 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index 653a1a0..1fd954e 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -300,12 +300,17 @@
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
- cc *ClientConn
- req *http.Request
+ cc *ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ cancelc <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@@ -322,7 +327,10 @@
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@@ -362,6 +370,10 @@
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@@ -369,17 +381,15 @@
}
}
-func (cs *clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}
type stickyErrWriter struct {
@@ -1010,15 +1020,19 @@
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
+ cc: cc,
+ ctx: req.Context(),
+ cancelc: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: actualContentLength(req),
+ trace: httptrace.ContextClientTrace(req.Context()),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
}
- go cs.doRequest()
+ go cs.doRequest(req)
waitDone := func() error {
select {
@@ -1045,7 +1059,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1062,8 +1076,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
case <-req.Cancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1072,8 +1089,8 @@
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1084,12 +1101,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
ctx := req.Context()
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1101,7 +1117,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.cancelc:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1124,7 +1140,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1143,19 +1159,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1171,7 +1191,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
err = errRequestCanceled
}
timer.Stop()
@@ -1181,7 +1201,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1216,15 +1236,14 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
ctx := req.Context()
cc.wmu.Lock()
@@ -1246,14 +1265,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1272,7 +1291,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1283,10 +1301,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1401,7 +1421,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1416,13 +1436,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1474,12 +1494,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1510,16 +1525,26 @@
return nil
}
+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
+ cc.mu.Lock()
+ trailer := req.Trailer
+ err = cs.abortErr
+ cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
+
cc.wmu.Lock()
+ defer cc.wmu.Unlock()
var trls []byte
- if hasTrailers {
- trls, err = cc.encodeTrailers(req)
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
if err != nil {
- cc.wmu.Unlock()
return err
}
}
- defer cc.wmu.Unlock()
// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@@ -1540,23 +1565,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
return 0, errRequestCanceled
default:
}
@@ -1770,11 +1794,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1784,7 +1808,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2163,8 +2187,7 @@
}
streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
+ if !streamEnded || cs.isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
@@ -2179,7 +2202,7 @@
}
}
- if streamEnded || isHead {
+ if streamEnded || cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2227,8 +2250,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2311,6 +2333,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2325,9 +2349,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.cancelc:
return errRequestCanceled
}
return nil
@@ -2381,7 +2405,7 @@
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 3c02695..d3cbeb8 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -5623,3 +5623,27 @@
}
ct.run()
}
+
+func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
+ st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(200)
+ w.(http.Flusher).Flush()
+ io.Copy(io.Discard, r.Body)
+ }, optOnlyServer)
+ defer st.Close()
+
+ tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+ defer tr.CloseIdleConnections()
+
+ pr, pw := net.Pipe()
+ req, err := http.NewRequest("GET", st.ts.URL, pr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res.Body.Close()
+ pw.Close()
+}
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
Damien Neil uploaded patch set #2 to this change.
http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 143 insertions(+), 74 deletions(-)
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
Damien Neil uploaded patch set #3 to this change.
http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 148 insertions(+), 79 deletions(-)
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Damien Neil.
Patch set 3:Code-Review +2
Attention is currently required from: Damien Neil.
Damien Neil uploaded patch set #4 to this change.
http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 147 insertions(+), 77 deletions(-)
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
1 comment:
Patchset:
1 of 12 TryBots failed. […]
Unrelated flake. I see the problem and will follow up with a fix for the flake.
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
Damien Neil submitted this change.
3 is the latest approved patch-set.
The change was submitted with unreviewed changes in the following files:
```
The name of the file: http2/transport_test.go
Insertions: 53, Deletions: 0.
@@ -5624,6 +5624,59 @@
ct.run()
}
+func TestTransportContentLengthWithoutBody(t *testing.T) {
+ contentLength := ""
+ st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Length", contentLength)
+ }, optOnlyServer)
+ defer st.Close()
+ tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+ defer tr.CloseIdleConnections()
+
+ for _, test := range []struct {
+ name string
+ contentLength string
+ wantBody string
+ wantErr error
+ wantContentLength int64
+ }{
+ {
+ name: "non-zero content length",
+ contentLength: "42",
+ wantErr: io.ErrUnexpectedEOF,
+ wantContentLength: 42,
+ },
+ {
+ name: "zero content length",
+ contentLength: "0",
+ wantErr: nil,
+ wantContentLength: 0,
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ contentLength = test.contentLength
+
+ req, _ := http.NewRequest("GET", st.ts.URL, nil)
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer res.Body.Close()
+ body, err := io.ReadAll(res.Body)
+
+ if err != test.wantErr {
+ t.Errorf("Expected error %v, got: %v", test.wantErr, err)
+ }
+ if len(body) > 0 {
+ t.Errorf("Expected empty body, got: %v", body)
+ }
+ if res.ContentLength != test.wantContentLength {
+ t.Errorf("Expected content length %d, got: %d", test.wantContentLength, res.ContentLength)
+ }
+ })
+ }
+}
+
func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
```
```
The name of the file: http2/transport.go
Insertions: 6, Deletions: 12.
@@ -769,6 +769,61 @@
return true
}
+// ClientConnState describes the state of a ClientConn.
+type ClientConnState struct {
+ // Closed is whether the connection is closed.
+ Closed bool
+
+ // Closing is whether the connection is in the process of
+ // closing. It may be closing due to shutdown, being a
+ // single-use connection, being marked as DoNotReuse, or
+ // having received a GOAWAY frame.
+ Closing bool
+
+ // StreamsActive is how many streams are active.
+ StreamsActive int
+
+ // StreamsReserved is how many streams have been reserved via
+ // ClientConn.ReserveNewRequest.
+ StreamsReserved int
+
+ // StreamsPending is how many requests have been sent in excess
+ // of the peer's advertised MaxConcurrentStreams setting and
+ // are waiting for other streams to complete.
+ StreamsPending int
+
+ // MaxConcurrentStreams is how many concurrent streams the
+ // peer advertised as acceptable. Zero means no SETTINGS
+ // frame has been received yet.
+ MaxConcurrentStreams uint32
+
+ // LastIdle, if non-zero, is when the connection last
+ // transitioned to idle state.
+ LastIdle time.Time
+}
+
+// State returns a snapshot of cc's state.
+func (cc *ClientConn) State() ClientConnState {
+ cc.wmu.Lock()
+ maxConcurrent := cc.maxConcurrentStreams
+ if !cc.seenSettings {
+ maxConcurrent = 0
+ }
+ cc.wmu.Unlock()
+
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ return ClientConnState{
+ Closed: cc.closed,
+ Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
+ StreamsActive: len(cc.streams),
+ StreamsReserved: cc.streamsReserved,
+ StreamsPending: cc.pendingRequests,
+ LastIdle: cc.lastIdle,
+ MaxConcurrentStreams: maxConcurrent,
+ }
+}
+
// clientConnIdleState describes the suitability of a client
// connection to initiate a new RoundTrip request.
type clientConnIdleState struct {
@@ -2186,27 +2241,33 @@
return nil, nil
}
- streamEnded := f.StreamEnded()
- if !streamEnded || cs.isHead {
- res.ContentLength = -1
- if clens := res.Header["Content-Length"]; len(clens) == 1 {
- if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
- res.ContentLength = int64(cl)
- } else {
- // TODO: care? unlike http/1, it won't mess up our framing, so it's
- // more safe smuggling-wise to ignore.
- }
- } else if len(clens) > 1 {
+ res.ContentLength = -1
+ if clens := res.Header["Content-Length"]; len(clens) == 1 {
+ if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
+ res.ContentLength = int64(cl)
+ } else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
+ } else if len(clens) > 1 {
+ // TODO: care? unlike http/1, it won't mess up our framing, so it's
+ // more safe smuggling-wise to ignore.
}
- if streamEnded || cs.isHead {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
+ if f.StreamEnded() {
+ if res.ContentLength > 0 {
+ res.Body = missingBody{}
+ } else {
+ res.Body = noBody
+ }
+ return res, nil
+ }
+
cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
@@ -2755,6 +2816,11 @@
var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
+type missingBody struct{}
+
+func (missingBody) Close() error { return nil }
+func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
+
func strSliceContains(ss []string, s string) bool {
for _, v := range ss {
if v == s {
```
http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 151 insertions(+), 77 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index a5ba742..2ff6544 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -300,12 +300,17 @@
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
- cc *ClientConn
- req *http.Request
+ cc *ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}@@ -1065,15 +1075,19 @@
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
+ cc: cc,
+ ctx: ctx,
+ reqCancel: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: actualContentLength(req),
+ trace: httptrace.ContextClientTrace(ctx),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
}
- go cs.doRequest()
+ go cs.doRequest(req)
waitDone := func() error {
select {
@@ -1081,7 +1095,7 @@
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
@@ -1100,7 +1114,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1117,8 +1131,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1127,8 +1144,8 @@
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1139,12 +1156,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1156,7 +1172,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1179,7 +1195,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1198,19 +1214,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1226,7 +1246,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1236,7 +1256,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1271,16 +1291,15 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1291,7 +1310,7 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1301,14 +1320,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1327,7 +1346,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1338,10 +1356,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1456,7 +1476,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1471,13 +1491,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1529,12 +1549,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1565,16 +1580,26 @@@@ -1595,23 +1620,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1825,11 +1849,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1839,7 +1863,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2230,7 +2254,7 @@
// more safe smuggling-wise to ignore.
}
- if cs.req.Method == "HEAD" {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2287,8 +2311,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2371,6 +2394,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2385,9 +2410,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2441,7 +2466,7 @@
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
diff --git a/http2/transport_test.go b/http2/transport_test.go
index b250738..322a4c4 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -5676,3 +5676,28 @@
})
}
}
+
+func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
+ st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(200)
+ w.(http.Flusher).Flush()
+ io.Copy(io.Discard, r.Body)
+ }, optOnlyServer)
+ defer st.Close()
+
+ tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+ defer tr.CloseIdleConnections()
+
+ pr, pw := net.Pipe()
+ req, err := http.NewRequest("GET", st.ts.URL, pr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Closing the Response's Body interrupts the blocked body read.
+ res.Body.Close()
+ pw.Close()
+}
To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Brad Fitzpatrick.
Damien Neil would like Brad Fitzpatrick to review this change.
[internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Updates golang/go#49076
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 153 insertions(+), 77 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index 054e524..03663b9 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -294,12 +294,17 @@
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
- cc *ClientConn
- req *http.Request
+ cc *ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ reqCancel <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool
abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@@ -316,7 +321,10 @@
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu
// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@@ -356,6 +364,10 @@
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@@ -363,17 +375,15 @@
}
}
-func (cs *clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}
type stickyErrWriter struct {
@@ -1002,15 +1012,19 @@@@ -1018,7 +1032,7 @@
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
@@ -1037,7 +1051,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1054,8 +1068,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1064,8 +1081,8 @@
// writeRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1076,12 +1093,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1093,7 +1109,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1116,7 +1132,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1135,19 +1151,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1163,7 +1183,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1173,7 +1193,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1208,16 +1228,15 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1228,7 +1247,7 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1238,14 +1257,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1264,7 +1283,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1275,10 +1293,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1393,7 +1413,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1408,13 +1428,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1466,12 +1486,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1502,16 +1517,26 @@@@ -1532,23 +1557,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1762,11 +1786,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1776,7 +1800,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2139,7 +2163,7 @@
// more safe smuggling-wise to ignore.
}
- if cs.req.Method == "HEAD" {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2196,8 +2220,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2280,6 +2303,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2294,9 +2319,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2350,7 +2375,7 @@
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 1e0c982..f04a1b0 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -5549,3 +5549,28 @@
To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Brad Fitzpatrick.
Damien Neil uploaded patch set #2 to this change.
[internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#48908.
Updates golang/go#49076
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 153 insertions(+), 77 deletions(-)
To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Brad Fitzpatrick.
Damien Neil would like Brad Fitzpatrick to review this change.
[internal-branch.go1.17-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#49077
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 151 insertions(+), 77 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index 43029a8..e402650 100644
@@ -998,15 +1008,19 @@@@ -1014,7 +1028,7 @@
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
@@ -1033,7 +1047,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1050,8 +1064,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1060,8 +1077,8 @@
// writeRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1072,12 +1089,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1089,7 +1105,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1112,7 +1128,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1131,19 +1147,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1159,7 +1179,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1169,7 +1189,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1204,16 +1224,15 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1224,7 +1243,7 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1234,14 +1253,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1260,7 +1279,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1271,10 +1289,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1389,7 +1409,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1404,13 +1424,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1462,12 +1482,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1498,16 +1513,26 @@@@ -1528,23 +1553,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1758,11 +1782,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1772,7 +1796,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2135,7 +2159,7 @@
// more safe smuggling-wise to ignore.
}
- if cs.req.Method == "HEAD" {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2192,8 +2216,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2276,6 +2299,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2290,9 +2315,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2346,7 +2371,7 @@To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Damien Neil.
1 comment:
Patchset:
1 of 9 TryBots failed. […]
Failure is not due to a deterministic problem introduced in this CL.
To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Dmitri Shuralyov.
Dmitri Shuralyov uploaded patch set #3 to the change originally created by Damien Neil.
[internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#49076
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 151 insertions(+), 77 deletions(-)
To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Damien Neil.
Patch set 1:Code-Review +2
To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.
Dmitri Shuralyov submitted this change.
[internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#49076
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
Reviewed-on: https://go-review.googlesource.com/c/net/+/357095
Reviewed-by: Dmitri Shuralyov <dmit...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 153 insertions(+), 77 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index 054e524..03663b9 100644
@@ -1002,15 +1012,19 @@@@ -1018,7 +1032,7 @@
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
@@ -1037,7 +1051,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1054,8 +1068,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1064,8 +1081,8 @@
// writeRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1076,12 +1093,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1093,7 +1109,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1116,7 +1132,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1135,19 +1151,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1163,7 +1183,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1173,7 +1193,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1208,16 +1228,15 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1228,7 +1247,7 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1238,14 +1257,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1264,7 +1283,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1275,10 +1293,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1393,7 +1413,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1408,13 +1428,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1466,12 +1486,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1502,16 +1517,26 @@@@ -1532,23 +1557,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1762,11 +1786,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1776,7 +1800,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2139,7 +2163,7 @@
// more safe smuggling-wise to ignore.
}
- if cs.req.Method == "HEAD" {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2196,8 +2220,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2280,6 +2303,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2294,9 +2319,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2350,7 +2375,7 @@To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.
[internal-branch.go1.17-vendor] http2: close the Request's Body when aborting a stream
After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.
Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.
Updates golang/go#49077
Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
Trust: Damien Neil <dn...@google.com>
Run-TryBot: Damien Neil <dn...@google.com>
Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
Reviewed-on: https://go-review.googlesource.com/c/net/+/357690
Reviewed-by: Dmitri Shuralyov <dmit...@golang.org>
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 153 insertions(+), 77 deletions(-)
diff --git a/http2/transport.go b/http2/transport.go
index 43029a8..e402650 100644
@@ -998,15 +1008,19 @@@@ -1014,7 +1028,7 @@
return nil
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
@@ -1033,7 +1047,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1050,8 +1064,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
- case <-req.Cancel:
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
+ case <-cs.reqCancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1060,8 +1077,8 @@
// writeRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}
@@ -1072,12 +1089,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}
@@ -1089,7 +1105,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1112,7 +1128,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1131,19 +1147,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}
- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}
- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1159,7 +1179,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
err = errRequestCanceled
}
timer.Stop()
@@ -1169,7 +1189,7 @@
}
}
- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1204,16 +1224,15 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
}
}
}
-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.wmu.Lock()
defer cc.wmu.Unlock()
@@ -1224,7 +1243,7 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return errRequestCanceled
default:
}
@@ -1234,14 +1253,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1260,7 +1279,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req
if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1271,10 +1289,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}
if err != nil && cs.sentEndStream {
@@ -1389,7 +1409,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1404,13 +1424,13 @@
var bufPool sync.Pool // of *[]byte
-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1
cc.mu.Lock()
@@ -1462,12 +1482,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1498,16 +1513,26 @@@@ -1528,23 +1553,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.reqCancel:
return 0, errRequestCanceled
default:
}
@@ -1758,11 +1782,11 @@
}
// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()
hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1772,7 +1796,7 @@
return nil, errRequestHeaderListSize
}
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2135,7 +2159,7 @@
// more safe smuggling-wise to ignore.
}
- if cs.req.Method == "HEAD" {
+ if cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2192,8 +2216,7 @@
}
// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2276,6 +2299,8 @@
}
cc.mu.Unlock()
+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2290,9 +2315,9 @@
select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.reqCancel:
return errRequestCanceled
}
return nil
@@ -2346,7 +2371,7 @@To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.