[net] http2: close the Request's Body when aborting a stream

305 views
Skip to first unread message

Damien Neil (Gerrit)

unread,
Oct 12, 2021, 7:29:17 PM10/12/21
to goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

Damien Neil has uploaded this change for review.

View Change

http2: close the Request's Body when aborting a stream

After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.

Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.

Updates golang/go#48908.

Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 142 insertions(+), 74 deletions(-)

diff --git a/http2/transport.go b/http2/transport.go
index 653a1a0..1fd954e 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -300,12 +300,17 @@
// clientStream is the state for a single HTTP/2 stream. One of these
// is created for each Transport.RoundTrip call.
type clientStream struct {
- cc *ClientConn
- req *http.Request
+ cc *ClientConn
+
+ // Fields of Request that we may access even after the response body is closed.
+ ctx context.Context
+ cancelc <-chan struct{}
+
trace *httptrace.ClientTrace // or nil
ID uint32
bufPipe pipe // buffered pipe with the flow-controlled response payload
requestedGzip bool
+ isHead bool

abortOnce sync.Once
abort chan struct{} // closed to signal stream should end immediately
@@ -322,7 +327,10 @@
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
- stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+
+ reqBody io.ReadCloser
+ reqBodyContentLength int64 // -1 means unknown
+ reqBodyClosed bool // body has been closed; guarded by cc.mu

// owned by writeRequest:
sentEndStream bool // sent an END_STREAM flag to the peer
@@ -362,6 +370,10 @@
cs.abortErr = err
close(cs.abort)
})
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
+ }
// TODO(dneil): Clean up tests where cs.cc.cond is nil.
if cs.cc.cond != nil {
// Wake up writeRequestBody if it is waiting on flow control.
@@ -369,17 +381,15 @@
}
}

-func (cs *clientStream) abortRequestBodyWrite(err error) {
- if err == nil {
- panic("nil error")
- }
+func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
- if cs.stopReqBody == nil {
- cs.stopReqBody = err
+ defer cc.mu.Unlock()
+ if cs.reqBody != nil && !cs.reqBodyClosed {
+ cs.reqBody.Close()
+ cs.reqBodyClosed = true
cc.cond.Broadcast()
}
- cc.mu.Unlock()
}

type stickyErrWriter struct {
@@ -1010,15 +1020,19 @@
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
ctx := req.Context()
cs := &clientStream{
- cc: cc,
- req: req,
- trace: httptrace.ContextClientTrace(req.Context()),
- peerClosed: make(chan struct{}),
- abort: make(chan struct{}),
- respHeaderRecv: make(chan struct{}),
- donec: make(chan struct{}),
+ cc: cc,
+ ctx: req.Context(),
+ cancelc: req.Cancel,
+ isHead: req.Method == "HEAD",
+ reqBody: req.Body,
+ reqBodyContentLength: actualContentLength(req),
+ trace: httptrace.ContextClientTrace(req.Context()),
+ peerClosed: make(chan struct{}),
+ abort: make(chan struct{}),
+ respHeaderRecv: make(chan struct{}),
+ donec: make(chan struct{}),
}
- go cs.doRequest()
+ go cs.doRequest(req)

waitDone := func() error {
select {
@@ -1045,7 +1059,7 @@
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
- cs.abortRequestBodyWrite(errStopReqBodyWrite)
+ cs.abortRequestBodyWrite()
}
res.Request = req
res.TLS = cc.tlsState
@@ -1062,8 +1076,11 @@
waitDone()
return nil, cs.abortErr
case <-ctx.Done():
- return nil, ctx.Err()
+ err := ctx.Err()
+ cs.abortStream(err)
+ return nil, err
case <-req.Cancel:
+ cs.abortStream(errRequestCanceled)
return nil, errRequestCanceled
}
}
@@ -1072,8 +1089,8 @@
// doRequest runs for the duration of the request lifetime.
//
// It sends the request and performs post-request cleanup (closing Request.Body, etc.).
-func (cs *clientStream) doRequest() {
- err := cs.writeRequest()
+func (cs *clientStream) doRequest(req *http.Request) {
+ err := cs.writeRequest(req)
cs.cleanupWriteRequest(err)
}

@@ -1084,12 +1101,11 @@
//
// It returns non-nil if the request ends otherwise.
// If the returned error is StreamError, the error Code may be used in resetting the stream.
-func (cs *clientStream) writeRequest() (err error) {
+func (cs *clientStream) writeRequest(req *http.Request) (err error) {
cc := cs.cc
- req := cs.req
ctx := req.Context()

- if err := checkConnHeaders(cs.req); err != nil {
+ if err := checkConnHeaders(req); err != nil {
return err
}

@@ -1101,7 +1117,7 @@
}
select {
case cc.reqHeaderMu <- struct{}{}:
- case <-req.Cancel:
+ case <-cs.cancelc:
return errRequestCanceled
case <-ctx.Done():
return ctx.Err()
@@ -1124,7 +1140,7 @@
if !cc.t.disableCompression() &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
- req.Method != "HEAD" {
+ !cs.isHead {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
@@ -1143,19 +1159,23 @@
continueTimeout := cc.t.expectContinueTimeout()
if continueTimeout != 0 &&
!httpguts.HeaderValuesContainsToken(
- cs.req.Header["Expect"],
+ req.Header["Expect"],
"100-continue") {
continueTimeout = 0
cs.on100 = make(chan struct{}, 1)
}

- err = cs.encodeAndWriteHeaders()
+ // Past this point (where we send request headers), it is possible for
+ // RoundTrip to return successfully. Since the RoundTrip contract permits
+ // the caller to "mutate or reuse" the Request after closing the Response's Body,
+ // we must take care when referencing the Request from here on.
+ err = cs.encodeAndWriteHeaders(req)
<-cc.reqHeaderMu
if err != nil {
return err
}

- hasBody := actualContentLength(cs.req) != 0
+ hasBody := cs.reqBodyContentLength != 0
if !hasBody {
cs.sentEndStream = true
} else {
@@ -1171,7 +1191,7 @@
err = cs.abortErr
case <-ctx.Done():
err = ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
err = errRequestCanceled
}
timer.Stop()
@@ -1181,7 +1201,7 @@
}
}

- if err = cs.writeRequestBody(req.Body); err != nil {
+ if err = cs.writeRequestBody(req); err != nil {
if err != errStopReqBodyWrite {
traceWroteRequest(cs.trace, err)
return err
@@ -1216,15 +1236,14 @@
return cs.abortErr
case <-ctx.Done():
return ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
return errRequestCanceled
}
}
}

-func (cs *clientStream) encodeAndWriteHeaders() error {
+func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
cc := cs.cc
- req := cs.req
ctx := req.Context()

cc.wmu.Lock()
@@ -1246,14 +1265,14 @@
// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
// sent by writeRequestBody below, along with any Trailers,
// again in form HEADERS{1}, CONTINUATION{0,})
- trailers, err := commaSeparatedTrailers(cs.req)
+ trailers, err := commaSeparatedTrailers(req)
if err != nil {
return err
}
hasTrailers := trailers != ""
- contentLen := actualContentLength(cs.req)
+ contentLen := actualContentLength(req)
hasBody := contentLen != 0
- hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
+ hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
if err != nil {
return err
}
@@ -1272,7 +1291,6 @@
// cleanupWriteRequest will send a reset to the peer.
func (cs *clientStream) cleanupWriteRequest(err error) {
cc := cs.cc
- req := cs.req

if cs.ID == 0 {
// We were canceled before creating the stream, so return our reservation.
@@ -1283,10 +1301,12 @@
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
- if req.Body != nil {
- if e := req.Body.Close(); err == nil {
- err = e
- }
+ cc.mu.Lock()
+ bodyClosed := cs.reqBodyClosed
+ cs.reqBodyClosed = true
+ cc.mu.Unlock()
+ if !bodyClosed && cs.reqBody != nil {
+ cs.reqBody.Close()
}

if err != nil && cs.sentEndStream {
@@ -1401,7 +1421,7 @@
if n > max {
n = max
}
- if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
+ if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
// Add an extra byte past the declared content-length to
// give the caller's Request.Body io.Reader a chance to
// give us more bytes than they declared, so we can catch it
@@ -1416,13 +1436,13 @@

var bufPool sync.Pool // of *[]byte

-func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
+func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
cc := cs.cc
+ body := cs.reqBody
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

- req := cs.req
hasTrailers := req.Trailer != nil
- remainLen := actualContentLength(req)
+ remainLen := cs.reqBodyContentLength
hasContentLen := remainLen != -1

cc.mu.Lock()
@@ -1474,12 +1494,7 @@
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
- switch {
- case err == errStopReqBodyWrite:
- return err
- case err == errStopReqBodyWriteAndCancel:
- return err
- case err != nil:
+ if err != nil {
return err
}
cc.wmu.Lock()
@@ -1510,16 +1525,26 @@
return nil
}

+ // Since the RoundTrip contract permits the caller to "mutate or reuse"
+ // a request after the Response's Body is closed, verify that this hasn't
+ // happened before accessing the trailers.
+ cc.mu.Lock()
+ trailer := req.Trailer
+ err = cs.abortErr
+ cc.mu.Unlock()
+ if err != nil {
+ return err
+ }
+
cc.wmu.Lock()
+ defer cc.wmu.Unlock()
var trls []byte
- if hasTrailers {
- trls, err = cc.encodeTrailers(req)
+ if len(trailer) > 0 {
+ trls, err = cc.encodeTrailers(trailer)
if err != nil {
- cc.wmu.Unlock()
return err
}
}
- defer cc.wmu.Unlock()

// Two ways to send END_STREAM: either with trailers, or
// with an empty DATA frame.
@@ -1540,23 +1565,22 @@
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
- req := cs.req
- ctx := req.Context()
+ ctx := cs.ctx
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
- if cs.stopReqBody != nil {
- return 0, cs.stopReqBody
+ if cs.reqBodyClosed {
+ return 0, errStopReqBodyWrite
}
select {
case <-cs.abort:
return 0, cs.abortErr
case <-ctx.Done():
return 0, ctx.Err()
- case <-req.Cancel:
+ case <-cs.cancelc:
return 0, errRequestCanceled
default:
}
@@ -1770,11 +1794,11 @@
}

// requires cc.wmu be held.
-func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
+func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
cc.hbuf.Reset()

hlSize := uint64(0)
- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
for _, v := range vv {
hf := hpack.HeaderField{Name: k, Value: v}
hlSize += uint64(hf.Size())
@@ -1784,7 +1808,7 @@
return nil, errRequestHeaderListSize
}

- for k, vv := range req.Trailer {
+ for k, vv := range trailer {
lowKey, ascii := asciiToLower(k)
if !ascii {
// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
@@ -2163,8 +2187,7 @@
}

streamEnded := f.StreamEnded()
- isHead := cs.req.Method == "HEAD"
- if !streamEnded || isHead {
+ if !streamEnded || cs.isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
@@ -2179,7 +2202,7 @@
}
}

- if streamEnded || isHead {
+ if streamEnded || cs.isHead {
res.Body = noBody
return res, nil
}
@@ -2227,8 +2250,7 @@
}

// transportResponseBody is the concrete type of Transport.RoundTrip's
-// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
-// On Close it sends RST_STREAM if EOF wasn't already seen.
+// Response.Body. It is an io.ReadCloser.
type transportResponseBody struct {
cs *clientStream
}
@@ -2311,6 +2333,8 @@
}
cc.mu.Unlock()

+ // TODO(dneil): Acquiring this mutex can block indefinitely.
+ // Move flow control return to a goroutine?
cc.wmu.Lock()
// Return connection-level flow control.
if unread > 0 {
@@ -2325,9 +2349,9 @@

select {
case <-cs.donec:
- case <-cs.req.Context().Done():
- return cs.req.Context().Err()
- case <-cs.req.Cancel:
+ case <-cs.ctx.Done():
+ return cs.ctx.Err()
+ case <-cs.cancelc:
return errRequestCanceled
}
return nil
@@ -2381,7 +2405,7 @@
return nil
}
if f.Length > 0 {
- if cs.req.Method == "HEAD" && len(data) > 0 {
+ if cs.isHead && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, StreamError{
StreamID: f.StreamID,
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 3c02695..d3cbeb8 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -5623,3 +5623,27 @@
}
ct.run()
}
+
+func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
+ st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(200)
+ w.(http.Flusher).Flush()
+ io.Copy(io.Discard, r.Body)
+ }, optOnlyServer)
+ defer st.Close()
+
+ tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+ defer tr.CloseIdleConnections()
+
+ pr, pw := net.Pipe()
+ req, err := http.NewRequest("GET", st.ts.URL, pr)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res.Body.Close()
+ pw.Close()
+}

To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: net
Gerrit-Branch: master
Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Gerrit-Change-Number: 355491
Gerrit-PatchSet: 1
Gerrit-Owner: Damien Neil <dn...@google.com>
Gerrit-Reviewer: Damien Neil <dn...@google.com>
Gerrit-MessageType: newchange

Damien Neil (Gerrit)

unread,
Oct 12, 2021, 7:29:56 PM10/12/21
to goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

Damien Neil uploaded patch set #2 to this change.

View Change

http2: close the Request's Body when aborting a stream

After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.

Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.

Updates golang/go#48908.

Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 143 insertions(+), 74 deletions(-)

To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: net
Gerrit-Branch: master
Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Gerrit-Change-Number: 355491
Gerrit-PatchSet: 2
Gerrit-Owner: Damien Neil <dn...@google.com>
Gerrit-Reviewer: Damien Neil <dn...@google.com>
Gerrit-CC: Go Bot <go...@golang.org>
Gerrit-MessageType: newpatchset

Damien Neil (Gerrit)

unread,
Oct 12, 2021, 8:08:35 PM10/12/21
to goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

Damien Neil uploaded patch set #3 to this change.

View Change

http2: close the Request's Body when aborting a stream

After RoundTrip returns, closing the Response's Body should
interrupt any ongoing write of the request body. Close the
Request's Body to unblock the body writer.

Take additional care around the use of a Request after
its Response's Body has been closed. The RoundTripper contract
permits the caller to modify the request after the Response's
body has been closed.

Updates golang/go#48908.

Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
---
M http2/transport_test.go
M http2/transport.go
2 files changed, 148 insertions(+), 79 deletions(-)

To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

Gerrit-Project: net
Gerrit-Branch: master
Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
Gerrit-Change-Number: 355491
Gerrit-PatchSet: 3
Gerrit-Owner: Damien Neil <dn...@google.com>
Gerrit-Reviewer: Damien Neil <dn...@google.com>
Gerrit-Reviewer: Go Bot <go...@golang.org>
Gerrit-MessageType: newpatchset

Brad Fitzpatrick (Gerrit)

unread,
Oct 13, 2021, 10:56:34 AM10/13/21
to Damien Neil, goph...@pubsubhelper.golang.org, Brad Fitzpatrick, Go Bot, golang-co...@googlegroups.com

Attention is currently required from: Damien Neil.

Patch set 3:Code-Review +2

View Change

    To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: master
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 355491
    Gerrit-PatchSet: 3
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-Attention: Damien Neil <dn...@google.com>
    Gerrit-Comment-Date: Wed, 13 Oct 2021 14:56:29 +0000
    Gerrit-HasComments: No
    Gerrit-Has-Labels: Yes
    Gerrit-MessageType: comment

    Damien Neil (Gerrit)

    unread,
    Oct 13, 2021, 12:56:13 PM10/13/21
    to goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

    Attention is currently required from: Damien Neil.

    Damien Neil uploaded patch set #4 to this change.

    View Change

    http2: close the Request's Body when aborting a stream

    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#48908.

    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 147 insertions(+), 77 deletions(-)

    To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: master
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 355491
    Gerrit-PatchSet: 4
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-Attention: Damien Neil <dn...@google.com>
    Gerrit-MessageType: newpatchset

    Damien Neil (Gerrit)

    unread,
    Oct 13, 2021, 1:12:53 PM10/13/21
    to goph...@pubsubhelper.golang.org, Go Bot, Brad Fitzpatrick, golang-co...@googlegroups.com

    View Change

    1 comment:

    • Patchset:

      • Patch Set #4:

        1 of 12 TryBots failed. […]

        Unrelated flake. I see the problem and will follow up with a fix for the flake.

    To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: master
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 355491
    Gerrit-PatchSet: 4
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-Comment-Date: Wed, 13 Oct 2021 17:12:51 +0000
    Gerrit-HasComments: Yes
    Gerrit-Has-Labels: No
    Comment-In-Reply-To: Go Bot <go...@golang.org>
    Gerrit-MessageType: comment

    Damien Neil (Gerrit)

    unread,
    Oct 13, 2021, 1:12:58 PM10/13/21
    to goph...@pubsubhelper.golang.org, golang-...@googlegroups.com, Go Bot, Brad Fitzpatrick, golang-co...@googlegroups.com

    Damien Neil submitted this change.

    View Change



    3 is the latest approved patch-set.
    The change was submitted with unreviewed changes in the following files:

    ```
    The name of the file: http2/transport_test.go
    Insertions: 53, Deletions: 0.

    @@ -5624,6 +5624,59 @@
    ct.run()
    }

    +func TestTransportContentLengthWithoutBody(t *testing.T) {
    + contentLength := ""

    + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
    +		w.Header().Set("Content-Length", contentLength)

    + }, optOnlyServer)
    + defer st.Close()
    + tr := &Transport{TLSClientConfig: tlsConfigInsecure}
    + defer tr.CloseIdleConnections()
    +
    +	for _, test := range []struct {
    + name string
    + contentLength string
    + wantBody string
    + wantErr error
    + wantContentLength int64
    + }{
    + {
    + name: "non-zero content length",
    + contentLength: "42",
    + wantErr: io.ErrUnexpectedEOF,
    + wantContentLength: 42,
    + },
    + {
    + name: "zero content length",
    + contentLength: "0",
    + wantErr: nil,
    + wantContentLength: 0,
    + },
    + } {
    + t.Run(test.name, func(t *testing.T) {
    + contentLength = test.contentLength
    +
    + req, _ := http.NewRequest("GET", st.ts.URL, nil)

    + res, err := tr.RoundTrip(req)
    + if err != nil {
    + t.Fatal(err)
    + }
    +			defer res.Body.Close()
    + body, err := io.ReadAll(res.Body)
    +
    + if err != test.wantErr {
    + t.Errorf("Expected error %v, got: %v", test.wantErr, err)
    + }
    + if len(body) > 0 {
    + t.Errorf("Expected empty body, got: %v", body)
    + }
    + if res.ContentLength != test.wantContentLength {
    + t.Errorf("Expected content length %d, got: %d", test.wantContentLength, res.ContentLength)
    + }
    + })
    + }
    +}
    +
    func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {

    st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
     		w.WriteHeader(200)
    ```
    ```
    The name of the file: http2/transport.go
    Insertions: 6, Deletions: 12.

    @@ -769,6 +769,61 @@
    return true
    }

    +// ClientConnState describes the state of a ClientConn.
    +type ClientConnState struct {
    + // Closed is whether the connection is closed.
    + Closed bool
    +
    + // Closing is whether the connection is in the process of
    + // closing. It may be closing due to shutdown, being a
    + // single-use connection, being marked as DoNotReuse, or
    + // having received a GOAWAY frame.
    + Closing bool
    +
    + // StreamsActive is how many streams are active.
    + StreamsActive int
    +
    + // StreamsReserved is how many streams have been reserved via
    + // ClientConn.ReserveNewRequest.
    + StreamsReserved int
    +
    + // StreamsPending is how many requests have been sent in excess
    + // of the peer's advertised MaxConcurrentStreams setting and
    + // are waiting for other streams to complete.
    + StreamsPending int
    +
    + // MaxConcurrentStreams is how many concurrent streams the
    + // peer advertised as acceptable. Zero means no SETTINGS
    + // frame has been received yet.
    + MaxConcurrentStreams uint32
    +
    + // LastIdle, if non-zero, is when the connection last
    + // transitioned to idle state.
    + LastIdle time.Time
    +}
    +
    +// State returns a snapshot of cc's state.
    +func (cc *ClientConn) State() ClientConnState {
    + cc.wmu.Lock()
    + maxConcurrent := cc.maxConcurrentStreams
    + if !cc.seenSettings {
    + maxConcurrent = 0
    + }
    + cc.wmu.Unlock()
    +
    + cc.mu.Lock()
    + defer cc.mu.Unlock()
    + return ClientConnState{
    + Closed: cc.closed,
    + Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
    + StreamsActive: len(cc.streams),
    + StreamsReserved: cc.streamsReserved,
    + StreamsPending: cc.pendingRequests,
    + LastIdle: cc.lastIdle,
    + MaxConcurrentStreams: maxConcurrent,
    + }
    +}
    +
    // clientConnIdleState describes the suitability of a client
    // connection to initiate a new RoundTrip request.
    type clientConnIdleState struct {
    @@ -2186,27 +2241,33 @@
    return nil, nil
    }

    - streamEnded := f.StreamEnded()
    - if !streamEnded || cs.isHead {
    - res.ContentLength = -1
    - if clens := res.Header["Content-Length"]; len(clens) == 1 {
    - if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
    - res.ContentLength = int64(cl)
    - } else {
    - // TODO: care? unlike http/1, it won't mess up our framing, so it's
    - // more safe smuggling-wise to ignore.
    - }
    - } else if len(clens) > 1 {
    + res.ContentLength = -1
    + if clens := res.Header["Content-Length"]; len(clens) == 1 {
    + if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
    + res.ContentLength = int64(cl)
    + } else {
    // TODO: care? unlike http/1, it won't mess up our framing, so it's
    // more safe smuggling-wise to ignore.
    }
    + } else if len(clens) > 1 {
    + // TODO: care? unlike http/1, it won't mess up our framing, so it's
    + // more safe smuggling-wise to ignore.
    }

    - if streamEnded || cs.isHead {
    + if cs.isHead {

    res.Body = noBody
    return res, nil
    }

    +	if f.StreamEnded() {
    + if res.ContentLength > 0 {
    + res.Body = missingBody{}
    + } else {
    + res.Body = noBody
    + }
    + return res, nil
    + }
    +
    cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
    cs.bytesRemain = res.ContentLength
    res.Body = transportResponseBody{cs}
    @@ -2755,6 +2816,11 @@

    var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))

    +type missingBody struct{}
    +
    +func (missingBody) Close() error { return nil }
    +func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
    +
    func strSliceContains(ss []string, s string) bool {
    for _, v := range ss {
    if v == s {
    ```

    Approvals: Brad Fitzpatrick: Looks good to me, approved Damien Neil: Trusted; Run TryBots Objections: Go Bot: TryBots failed
    http2: close the Request's Body when aborting a stream

    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#48908.

    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
    Trust: Damien Neil <dn...@google.com>
    Run-TryBot: Damien Neil <dn...@google.com>
    Reviewed-by: Brad Fitzpatrick <brad...@golang.org>

    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 151 insertions(+), 77 deletions(-)

    diff --git a/http2/transport.go b/http2/transport.go
    index a5ba742..2ff6544 100644

    --- a/http2/transport.go
    +++ b/http2/transport.go
    @@ -300,12 +300,17 @@
    // clientStream is the state for a single HTTP/2 stream. One of these
    // is created for each Transport.RoundTrip call.
    type clientStream struct {
    - cc *ClientConn
    - req *http.Request
    + cc *ClientConn
    +
    + // Fields of Request that we may access even after the response body is closed.
    + ctx context.Context
    +	reqCancel <-chan struct{}
    @@ -1065,15 +1075,19 @@

    func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
    ctx := req.Context()
    cs := &clientStream{
    - cc: cc,
    - req: req,
    - trace: httptrace.ContextClientTrace(req.Context()),
    - peerClosed: make(chan struct{}),
    - abort: make(chan struct{}),
    - respHeaderRecv: make(chan struct{}),
    - donec: make(chan struct{}),
    + cc: cc,
    +		ctx:                  ctx,
    + reqCancel: req.Cancel,

    + isHead: req.Method == "HEAD",
    + reqBody: req.Body,
    + reqBodyContentLength: actualContentLength(req),
    +		trace:                httptrace.ContextClientTrace(ctx),

    + peerClosed: make(chan struct{}),
    + abort: make(chan struct{}),
    + respHeaderRecv: make(chan struct{}),
    + donec: make(chan struct{}),
    }
    - go cs.doRequest()
    + go cs.doRequest(req)

    waitDone := func() error {
    select {
    @@ -1081,7 +1095,7 @@
    return nil

    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    +		case <-cs.reqCancel:
    return errRequestCanceled
    }
    }
    @@ -1100,7 +1114,7 @@

    // doesn't, they'll RST_STREAM us soon enough. This is a
    // heuristic to avoid adding knobs to Transport. Hopefully
    // we can keep it.
    - cs.abortRequestBodyWrite(errStopReqBodyWrite)
    + cs.abortRequestBodyWrite()
    }
    res.Request = req
    res.TLS = cc.tlsState
    @@ -1117,8 +1131,11 @@

    waitDone()
    return nil, cs.abortErr
    case <-ctx.Done():
    - return nil, ctx.Err()
    -		case <-req.Cancel:

    + err := ctx.Err()
    + cs.abortStream(err)
    + return nil, err
    +		case <-cs.reqCancel:

    + cs.abortStream(errRequestCanceled)
    return nil, errRequestCanceled
    }
    }
    @@ -1127,8 +1144,8 @@

    // doRequest runs for the duration of the request lifetime.
    //
    // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
    -func (cs *clientStream) doRequest() {
    - err := cs.writeRequest()
    +func (cs *clientStream) doRequest(req *http.Request) {
    + err := cs.writeRequest(req)
    cs.cleanupWriteRequest(err)
    }

    @@ -1139,12 +1156,11 @@

    //
    // It returns non-nil if the request ends otherwise.
    // If the returned error is StreamError, the error Code may be used in resetting the stream.
    -func (cs *clientStream) writeRequest() (err error) {
    +func (cs *clientStream) writeRequest(req *http.Request) (err error) {
    cc := cs.cc
    - req := cs.req
    -	ctx := req.Context()
    + ctx := cs.ctx

    -	if err := checkConnHeaders(cs.req); err != nil {
    + if err := checkConnHeaders(req); err != nil {
    return err
    }

    @@ -1156,7 +1172,7 @@

    }
    select {
    case cc.reqHeaderMu <- struct{}{}:
    - case <-req.Cancel:
    +	case <-cs.reqCancel:

    return errRequestCanceled
    case <-ctx.Done():
    return ctx.Err()
    @@ -1179,7 +1195,7 @@

    if !cc.t.disableCompression() &&
    req.Header.Get("Accept-Encoding") == "" &&
    req.Header.Get("Range") == "" &&
    - req.Method != "HEAD" {
    + !cs.isHead {
    // Request gzip only, not deflate. Deflate is ambiguous and
    // not as universally supported anyway.
    // See: https://zlib.net/zlib_faq.html#faq39
    @@ -1198,19 +1214,23 @@

    continueTimeout := cc.t.expectContinueTimeout()
    if continueTimeout != 0 &&
    !httpguts.HeaderValuesContainsToken(
    - cs.req.Header["Expect"],
    + req.Header["Expect"],
    "100-continue") {
    continueTimeout = 0
    cs.on100 = make(chan struct{}, 1)
    }

    - err = cs.encodeAndWriteHeaders()
    + // Past this point (where we send request headers), it is possible for
    + // RoundTrip to return successfully. Since the RoundTrip contract permits
    + // the caller to "mutate or reuse" the Request after closing the Response's Body,
    + // we must take care when referencing the Request from here on.
    + err = cs.encodeAndWriteHeaders(req)
    <-cc.reqHeaderMu
    if err != nil {
    return err
    }

    - hasBody := actualContentLength(cs.req) != 0
    + hasBody := cs.reqBodyContentLength != 0
    if !hasBody {
    cs.sentEndStream = true
    } else {
    @@ -1226,7 +1246,7 @@

    err = cs.abortErr
    case <-ctx.Done():
    err = ctx.Err()
    - case <-req.Cancel:
    +			case <-cs.reqCancel:
    err = errRequestCanceled
    }
    timer.Stop()
    @@ -1236,7 +1256,7 @@

    }
    }

    - if err = cs.writeRequestBody(req.Body); err != nil {
    + if err = cs.writeRequestBody(req); err != nil {
    if err != errStopReqBodyWrite {
    traceWroteRequest(cs.trace, err)
    return err
    @@ -1271,16 +1291,15 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    +		case <-cs.reqCancel:

    return errRequestCanceled
    }
    }
    }

    -func (cs *clientStream) encodeAndWriteHeaders() error {
    +func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
    cc := cs.cc
    - req := cs.req
    -	ctx := req.Context()
    + ctx := cs.ctx

     	cc.wmu.Lock()
    defer cc.wmu.Unlock()
    @@ -1291,7 +1310,7 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    +	case <-cs.reqCancel:
    return errRequestCanceled
    default:
    }
    @@ -1301,14 +1320,14 @@

    // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
    // sent by writeRequestBody below, along with any Trailers,
    // again in form HEADERS{1}, CONTINUATION{0,})
    - trailers, err := commaSeparatedTrailers(cs.req)
    + trailers, err := commaSeparatedTrailers(req)
    if err != nil {
    return err
    }
    hasTrailers := trailers != ""
    - contentLen := actualContentLength(cs.req)
    + contentLen := actualContentLength(req)
    hasBody := contentLen != 0
    - hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
    + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
    if err != nil {
    return err
    }
    @@ -1327,7 +1346,6 @@

    // cleanupWriteRequest will send a reset to the peer.
    func (cs *clientStream) cleanupWriteRequest(err error) {
    cc := cs.cc
    - req := cs.req

    if cs.ID == 0 {
    // We were canceled before creating the stream, so return our reservation.
    @@ -1338,10 +1356,12 @@

    // Request.Body is closed by the Transport,
    // and in multiple cases: server replies <=299 and >299
    // while still writing request body
    - if req.Body != nil {
    - if e := req.Body.Close(); err == nil {
    - err = e
    - }
    + cc.mu.Lock()
    + bodyClosed := cs.reqBodyClosed
    + cs.reqBodyClosed = true
    + cc.mu.Unlock()
    + if !bodyClosed && cs.reqBody != nil {
    + cs.reqBody.Close()
    }

    if err != nil && cs.sentEndStream {
    @@ -1456,7 +1476,7 @@

    if n > max {
    n = max
    }
    - if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
    + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
    // Add an extra byte past the declared content-length to
    // give the caller's Request.Body io.Reader a chance to
    // give us more bytes than they declared, so we can catch it
    @@ -1471,13 +1491,13 @@


    var bufPool sync.Pool // of *[]byte

    -func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
    +func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
    cc := cs.cc
    + body := cs.reqBody
    sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

    - req := cs.req
    hasTrailers := req.Trailer != nil
    - remainLen := actualContentLength(req)
    + remainLen := cs.reqBodyContentLength
    hasContentLen := remainLen != -1

    cc.mu.Lock()
    @@ -1529,12 +1549,7 @@

    for len(remain) > 0 && err == nil {
    var allowed int32
    allowed, err = cs.awaitFlowControl(len(remain))
    - switch {
    - case err == errStopReqBodyWrite:
    - return err
    - case err == errStopReqBodyWriteAndCancel:
    - return err
    - case err != nil:
    + if err != nil {
    return err
    }
    cc.wmu.Lock()
    @@ -1565,16 +1580,26 @@
    @@ -1595,23 +1620,22 @@

    // if the stream is dead.
    func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx
    cc.mu.Lock()
    defer cc.mu.Unlock()
    for {
    if cc.closed {
    return 0, errClientConnClosed
    }
    - if cs.stopReqBody != nil {
    - return 0, cs.stopReqBody
    + if cs.reqBodyClosed {
    + return 0, errStopReqBodyWrite
    }
    select {
    case <-cs.abort:
    return 0, cs.abortErr
    case <-ctx.Done():
    return 0, ctx.Err()
    - case <-req.Cancel:
    +		case <-cs.reqCancel:
    return 0, errRequestCanceled
    default:
    }
    @@ -1825,11 +1849,11 @@

    }

    // requires cc.wmu be held.
    -func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
    +func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
    cc.hbuf.Reset()

    hlSize := uint64(0)
    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    for _, v := range vv {
    hf := hpack.HeaderField{Name: k, Value: v}
    hlSize += uint64(hf.Size())
    @@ -1839,7 +1863,7 @@

    return nil, errRequestHeaderListSize
    }

    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    lowKey, ascii := asciiToLower(k)
    if !ascii {
    // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
    @@ -2230,7 +2254,7 @@
    // more safe smuggling-wise to ignore.
    }

    - if cs.req.Method == "HEAD" {
    + if cs.isHead {

    res.Body = noBody
    return res, nil
    }
    @@ -2287,8 +2311,7 @@

    }

    // transportResponseBody is the concrete type of Transport.RoundTrip's
    -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
    -// On Close it sends RST_STREAM if EOF wasn't already seen.
    +// Response.Body. It is an io.ReadCloser.
    type transportResponseBody struct {
    cs *clientStream
    }
    @@ -2371,6 +2394,8 @@

    }
    cc.mu.Unlock()

    + // TODO(dneil): Acquiring this mutex can block indefinitely.
    + // Move flow control return to a goroutine?
    cc.wmu.Lock()
    // Return connection-level flow control.
    if unread > 0 {
    @@ -2385,9 +2410,9 @@


    select {
    case <-cs.donec:
    - case <-cs.req.Context().Done():
    - return cs.req.Context().Err()
    - case <-cs.req.Cancel:
    + case <-cs.ctx.Done():
    + return cs.ctx.Err()
    +	case <-cs.reqCancel:
    return errRequestCanceled
    }
    return nil
    @@ -2441,7 +2466,7 @@

    return nil
    }
    if f.Length > 0 {
    - if cs.req.Method == "HEAD" && len(data) > 0 {
    + if cs.isHead && len(data) > 0 {
    cc.logf("protocol error: received DATA on a HEAD request")
    rl.endStreamError(cs, StreamError{
    StreamID: f.StreamID,
    diff --git a/http2/transport_test.go b/http2/transport_test.go
    index b250738..322a4c4 100644
    --- a/http2/transport_test.go
    +++ b/http2/transport_test.go
    @@ -5676,3 +5676,28 @@

    })
    }
    }
    +
    +func TestTransportCloseResponseBodyWhileRequestBodyHangs(t *testing.T) {
    + st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
    + w.WriteHeader(200)
    + w.(http.Flusher).Flush()
    + io.Copy(io.Discard, r.Body)
    + }, optOnlyServer)
    + defer st.Close()
    +
    + tr := &Transport{TLSClientConfig: tlsConfigInsecure}
    + defer tr.CloseIdleConnections()
    +
    + pr, pw := net.Pipe()
    + req, err := http.NewRequest("GET", st.ts.URL, pr)
    + if err != nil {
    + t.Fatal(err)
    + }
    + res, err := tr.RoundTrip(req)
    + if err != nil {
    + t.Fatal(err)
    + }
    +	// Closing the Response's Body interrupts the blocked body read.

    + res.Body.Close()
    + pw.Close()
    +}

    To view, visit change 355491. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: master
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 355491
    Gerrit-PatchSet: 5
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-MessageType: merged

    Damien Neil (Gerrit)

    unread,
    Oct 19, 2021, 6:00:22 PM10/19/21
    to Brad Fitzpatrick, goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

    Attention is currently required from: Brad Fitzpatrick.

    Damien Neil would like Brad Fitzpatrick to review this change.

    View Change

    [internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream


    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#48908.

    Updates golang/go#49076


    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
    Trust: Damien Neil <dn...@google.com>
    Run-TryBot: Damien Neil <dn...@google.com>
    Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 153 insertions(+), 77 deletions(-)

    diff --git a/http2/transport.go b/http2/transport.go
    index 054e524..03663b9 100644
    --- a/http2/transport.go
    +++ b/http2/transport.go
    @@ -294,12 +294,17 @@

    // clientStream is the state for a single HTTP/2 stream. One of these
    // is created for each Transport.RoundTrip call.
    type clientStream struct {
    - cc *ClientConn
    - req *http.Request
    + cc *ClientConn
    +
    + // Fields of Request that we may access even after the response body is closed.
    + ctx context.Context
    + reqCancel <-chan struct{}
    +
    trace *httptrace.ClientTrace // or nil
    ID uint32
    bufPipe pipe // buffered pipe with the flow-controlled response payload
    requestedGzip bool
    + isHead bool

    abortOnce sync.Once
    abort chan struct{} // closed to signal stream should end immediately
    @@ -316,7 +321,10 @@

    inflow flow // guarded by cc.mu
    bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
    readErr error // sticky read error; owned by transportResponseBody.Read
    - stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
    +
    + reqBody io.ReadCloser
    + reqBodyContentLength int64 // -1 means unknown
    + reqBodyClosed bool // body has been closed; guarded by cc.mu

    // owned by writeRequest:
    sentEndStream bool // sent an END_STREAM flag to the peer
    @@ -356,6 +364,10 @@

    cs.abortErr = err
    close(cs.abort)
    })
    + if cs.reqBody != nil && !cs.reqBodyClosed {
    + cs.reqBody.Close()
    + cs.reqBodyClosed = true
    + }
    // TODO(dneil): Clean up tests where cs.cc.cond is nil.
    if cs.cc.cond != nil {
    // Wake up writeRequestBody if it is waiting on flow control.
    @@ -363,17 +375,15 @@

    }
    }

    -func (cs *clientStream) abortRequestBodyWrite(err error) {
    - if err == nil {
    - panic("nil error")
    - }
    +func (cs *clientStream) abortRequestBodyWrite() {
    cc := cs.cc
    cc.mu.Lock()
    - if cs.stopReqBody == nil {
    - cs.stopReqBody = err
    + defer cc.mu.Unlock()
    + if cs.reqBody != nil && !cs.reqBodyClosed {
    + cs.reqBody.Close()
    + cs.reqBodyClosed = true
    cc.cond.Broadcast()
    }
    - cc.mu.Unlock()
    }

    type stickyErrWriter struct {
    @@ -1002,15 +1012,19 @@
    @@ -1018,7 +1032,7 @@

    return nil
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    }
    @@ -1037,7 +1051,7 @@

    // doesn't, they'll RST_STREAM us soon enough. This is a
    // heuristic to avoid adding knobs to Transport. Hopefully
    // we can keep it.
    - cs.abortRequestBodyWrite(errStopReqBodyWrite)
    + cs.abortRequestBodyWrite()
    }
    res.Request = req
    res.TLS = cc.tlsState
    @@ -1054,8 +1068,11 @@

    waitDone()
    return nil, cs.abortErr
    case <-ctx.Done():
    - return nil, ctx.Err()
    - case <-req.Cancel:
    + err := ctx.Err()
    + cs.abortStream(err)
    + return nil, err
    + case <-cs.reqCancel:
    + cs.abortStream(errRequestCanceled)
    return nil, errRequestCanceled
    }
    }
    @@ -1064,8 +1081,8 @@
    // writeRequest runs for the duration of the request lifetime.

    //
    // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
    -func (cs *clientStream) doRequest() {
    - err := cs.writeRequest()
    +func (cs *clientStream) doRequest(req *http.Request) {
    + err := cs.writeRequest(req)
    cs.cleanupWriteRequest(err)
    }

    @@ -1076,12 +1093,11 @@

    //
    // It returns non-nil if the request ends otherwise.
    // If the returned error is StreamError, the error Code may be used in resetting the stream.
    -func (cs *clientStream) writeRequest() (err error) {
    +func (cs *clientStream) writeRequest(req *http.Request) (err error) {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx

    - if err := checkConnHeaders(cs.req); err != nil {
    + if err := checkConnHeaders(req); err != nil {
    return err
    }

    @@ -1093,7 +1109,7 @@

    }
    select {
    case cc.reqHeaderMu <- struct{}{}:
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    case <-ctx.Done():
    return ctx.Err()
    @@ -1116,7 +1132,7 @@

    if !cc.t.disableCompression() &&
    req.Header.Get("Accept-Encoding") == "" &&
    req.Header.Get("Range") == "" &&
    - req.Method != "HEAD" {
    + !cs.isHead {
    // Request gzip only, not deflate. Deflate is ambiguous and
    // not as universally supported anyway.
    // See: https://zlib.net/zlib_faq.html#faq39
    @@ -1135,19 +1151,23 @@

    continueTimeout := cc.t.expectContinueTimeout()
    if continueTimeout != 0 &&
    !httpguts.HeaderValuesContainsToken(
    - cs.req.Header["Expect"],
    + req.Header["Expect"],
    "100-continue") {
    continueTimeout = 0
    cs.on100 = make(chan struct{}, 1)
    }

    - err = cs.encodeAndWriteHeaders()
    + // Past this point (where we send request headers), it is possible for
    + // RoundTrip to return successfully. Since the RoundTrip contract permits
    + // the caller to "mutate or reuse" the Request after closing the Response's Body,
    + // we must take care when referencing the Request from here on.
    + err = cs.encodeAndWriteHeaders(req)
    <-cc.reqHeaderMu
    if err != nil {
    return err
    }

    - hasBody := actualContentLength(cs.req) != 0
    + hasBody := cs.reqBodyContentLength != 0
    if !hasBody {
    cs.sentEndStream = true
    } else {
    @@ -1163,7 +1183,7 @@

    err = cs.abortErr
    case <-ctx.Done():
    err = ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    err = errRequestCanceled
    }
    timer.Stop()
    @@ -1173,7 +1193,7 @@

    }
    }

    - if err = cs.writeRequestBody(req.Body); err != nil {
    + if err = cs.writeRequestBody(req); err != nil {
    if err != errStopReqBodyWrite {
    traceWroteRequest(cs.trace, err)
    return err
    @@ -1208,16 +1228,15 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    }
    }

    -func (cs *clientStream) encodeAndWriteHeaders() error {
    +func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx

    cc.wmu.Lock()
    defer cc.wmu.Unlock()
    @@ -1228,7 +1247,7 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    default:
    }
    @@ -1238,14 +1257,14 @@

    // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
    // sent by writeRequestBody below, along with any Trailers,
    // again in form HEADERS{1}, CONTINUATION{0,})
    - trailers, err := commaSeparatedTrailers(cs.req)
    + trailers, err := commaSeparatedTrailers(req)
    if err != nil {
    return err
    }
    hasTrailers := trailers != ""
    - contentLen := actualContentLength(cs.req)
    + contentLen := actualContentLength(req)
    hasBody := contentLen != 0
    - hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
    + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
    if err != nil {
    return err
    }
    @@ -1264,7 +1283,6 @@

    // cleanupWriteRequest will send a reset to the peer.
    func (cs *clientStream) cleanupWriteRequest(err error) {
    cc := cs.cc
    - req := cs.req

    if cs.ID == 0 {
    // We were canceled before creating the stream, so return our reservation.
    @@ -1275,10 +1293,12 @@

    // Request.Body is closed by the Transport,
    // and in multiple cases: server replies <=299 and >299
    // while still writing request body
    - if req.Body != nil {
    - if e := req.Body.Close(); err == nil {
    - err = e
    - }
    + cc.mu.Lock()
    + bodyClosed := cs.reqBodyClosed
    + cs.reqBodyClosed = true
    + cc.mu.Unlock()
    + if !bodyClosed && cs.reqBody != nil {
    + cs.reqBody.Close()
    }

    if err != nil && cs.sentEndStream {
    @@ -1393,7 +1413,7 @@

    if n > max {
    n = max
    }
    - if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
    + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
    // Add an extra byte past the declared content-length to
    // give the caller's Request.Body io.Reader a chance to
    // give us more bytes than they declared, so we can catch it
    @@ -1408,13 +1428,13 @@


    var bufPool sync.Pool // of *[]byte

    -func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
    +func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
    cc := cs.cc
    + body := cs.reqBody
    sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

    - req := cs.req
    hasTrailers := req.Trailer != nil
    - remainLen := actualContentLength(req)
    + remainLen := cs.reqBodyContentLength
    hasContentLen := remainLen != -1

    cc.mu.Lock()
    @@ -1466,12 +1486,7 @@

    for len(remain) > 0 && err == nil {
    var allowed int32
    allowed, err = cs.awaitFlowControl(len(remain))
    - switch {
    - case err == errStopReqBodyWrite:
    - return err
    - case err == errStopReqBodyWriteAndCancel:
    - return err
    - case err != nil:
    + if err != nil {
    return err
    }
    cc.wmu.Lock()
    @@ -1502,16 +1517,26 @@
    @@ -1532,23 +1557,22 @@

    // if the stream is dead.
    func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx
    cc.mu.Lock()
    defer cc.mu.Unlock()
    for {
    if cc.closed {
    return 0, errClientConnClosed
    }
    - if cs.stopReqBody != nil {
    - return 0, cs.stopReqBody
    + if cs.reqBodyClosed {
    + return 0, errStopReqBodyWrite
    }
    select {
    case <-cs.abort:
    return 0, cs.abortErr
    case <-ctx.Done():
    return 0, ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return 0, errRequestCanceled
    default:
    }
    @@ -1762,11 +1786,11 @@

    }

    // requires cc.wmu be held.
    -func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
    +func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
    cc.hbuf.Reset()

    hlSize := uint64(0)
    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    for _, v := range vv {
    hf := hpack.HeaderField{Name: k, Value: v}
    hlSize += uint64(hf.Size())
    @@ -1776,7 +1800,7 @@

    return nil, errRequestHeaderListSize
    }

    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    lowKey, ascii := asciiToLower(k)
    if !ascii {
    // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
    @@ -2139,7 +2163,7 @@

    // more safe smuggling-wise to ignore.
    }

    - if cs.req.Method == "HEAD" {
    + if cs.isHead {
    res.Body = noBody
    return res, nil
    }
    @@ -2196,8 +2220,7 @@

    }

    // transportResponseBody is the concrete type of Transport.RoundTrip's
    -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
    -// On Close it sends RST_STREAM if EOF wasn't already seen.
    +// Response.Body. It is an io.ReadCloser.
    type transportResponseBody struct {
    cs *clientStream
    }
    @@ -2280,6 +2303,8 @@

    }
    cc.mu.Unlock()

    + // TODO(dneil): Acquiring this mutex can block indefinitely.
    + // Move flow control return to a goroutine?
    cc.wmu.Lock()
    // Return connection-level flow control.
    if unread > 0 {
    @@ -2294,9 +2319,9 @@


    select {
    case <-cs.donec:
    - case <-cs.req.Context().Done():
    - return cs.req.Context().Err()
    - case <-cs.req.Cancel:
    + case <-cs.ctx.Done():
    + return cs.ctx.Err()
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    return nil
    @@ -2350,7 +2375,7 @@

    return nil
    }
    if f.Length > 0 {
    - if cs.req.Method == "HEAD" && len(data) > 0 {
    + if cs.isHead && len(data) > 0 {
    cc.logf("protocol error: received DATA on a HEAD request")
    rl.endStreamError(cs, StreamError{
    StreamID: f.StreamID,
    diff --git a/http2/transport_test.go b/http2/transport_test.go
    index 1e0c982..f04a1b0 100644
    --- a/http2/transport_test.go
    +++ b/http2/transport_test.go
    @@ -5549,3 +5549,28 @@

    To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: internal-branch.go1.16-vendor
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 357095
    Gerrit-PatchSet: 1
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Attention: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-MessageType: newchange

    Damien Neil (Gerrit)

    unread,
    Oct 19, 2021, 6:11:11 PM10/19/21
    to goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

    Attention is currently required from: Brad Fitzpatrick.

    Damien Neil uploaded patch set #2 to this change.

    View Change

    [internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream

    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#48908.

    Updates golang/go#49076

    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
    Trust: Damien Neil <dn...@google.com>
    Run-TryBot: Damien Neil <dn...@google.com>
    Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 153 insertions(+), 77 deletions(-)

    To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: internal-branch.go1.16-vendor
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 357095
    Gerrit-PatchSet: 2
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-Attention: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-MessageType: newpatchset

    Damien Neil (Gerrit)

    unread,
    Oct 21, 2021, 1:08:58 PM10/21/21
    to Brad Fitzpatrick, goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

    Attention is currently required from: Brad Fitzpatrick.

    Damien Neil would like Brad Fitzpatrick to review this change.

    View Change

    [internal-branch.go1.17-vendor] http2: close the Request's Body when aborting a stream


    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#49077


    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
    Trust: Damien Neil <dn...@google.com>
    Run-TryBot: Damien Neil <dn...@google.com>
    Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 151 insertions(+), 77 deletions(-)

    diff --git a/http2/transport.go b/http2/transport.go
    index 43029a8..e402650 100644
    @@ -998,15 +1008,19 @@
    @@ -1014,7 +1028,7 @@

    return nil
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    }
    @@ -1033,7 +1047,7 @@

    // doesn't, they'll RST_STREAM us soon enough. This is a
    // heuristic to avoid adding knobs to Transport. Hopefully
    // we can keep it.
    - cs.abortRequestBodyWrite(errStopReqBodyWrite)
    + cs.abortRequestBodyWrite()
    }
    res.Request = req
    res.TLS = cc.tlsState
    @@ -1050,8 +1064,11 @@

    waitDone()
    return nil, cs.abortErr
    case <-ctx.Done():
    - return nil, ctx.Err()
    - case <-req.Cancel:
    + err := ctx.Err()
    + cs.abortStream(err)
    + return nil, err
    + case <-cs.reqCancel:
    + cs.abortStream(errRequestCanceled)
    return nil, errRequestCanceled
    }
    }
    @@ -1060,8 +1077,8 @@

    // writeRequest runs for the duration of the request lifetime.
    //
    // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
    -func (cs *clientStream) doRequest() {
    - err := cs.writeRequest()
    +func (cs *clientStream) doRequest(req *http.Request) {
    + err := cs.writeRequest(req)
    cs.cleanupWriteRequest(err)
    }

    @@ -1072,12 +1089,11 @@

    //
    // It returns non-nil if the request ends otherwise.
    // If the returned error is StreamError, the error Code may be used in resetting the stream.
    -func (cs *clientStream) writeRequest() (err error) {
    +func (cs *clientStream) writeRequest(req *http.Request) (err error) {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx

    - if err := checkConnHeaders(cs.req); err != nil {
    + if err := checkConnHeaders(req); err != nil {
    return err
    }

    @@ -1089,7 +1105,7 @@

    }
    select {
    case cc.reqHeaderMu <- struct{}{}:
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    case <-ctx.Done():
    return ctx.Err()
    @@ -1112,7 +1128,7 @@

    if !cc.t.disableCompression() &&
    req.Header.Get("Accept-Encoding") == "" &&
    req.Header.Get("Range") == "" &&
    - req.Method != "HEAD" {
    + !cs.isHead {
    // Request gzip only, not deflate. Deflate is ambiguous and
    // not as universally supported anyway.
    // See: https://zlib.net/zlib_faq.html#faq39
    @@ -1131,19 +1147,23 @@

    continueTimeout := cc.t.expectContinueTimeout()
    if continueTimeout != 0 &&
    !httpguts.HeaderValuesContainsToken(
    - cs.req.Header["Expect"],
    + req.Header["Expect"],
    "100-continue") {
    continueTimeout = 0
    cs.on100 = make(chan struct{}, 1)
    }

    - err = cs.encodeAndWriteHeaders()
    + // Past this point (where we send request headers), it is possible for
    + // RoundTrip to return successfully. Since the RoundTrip contract permits
    + // the caller to "mutate or reuse" the Request after closing the Response's Body,
    + // we must take care when referencing the Request from here on.
    + err = cs.encodeAndWriteHeaders(req)
    <-cc.reqHeaderMu
    if err != nil {
    return err
    }

    - hasBody := actualContentLength(cs.req) != 0
    + hasBody := cs.reqBodyContentLength != 0
    if !hasBody {
    cs.sentEndStream = true
    } else {
    @@ -1159,7 +1179,7 @@

    err = cs.abortErr
    case <-ctx.Done():
    err = ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    err = errRequestCanceled
    }
    timer.Stop()
    @@ -1169,7 +1189,7 @@

    }
    }

    - if err = cs.writeRequestBody(req.Body); err != nil {
    + if err = cs.writeRequestBody(req); err != nil {
    if err != errStopReqBodyWrite {
    traceWroteRequest(cs.trace, err)
    return err
    @@ -1204,16 +1224,15 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    }
    }

    -func (cs *clientStream) encodeAndWriteHeaders() error {
    +func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx

    cc.wmu.Lock()
    defer cc.wmu.Unlock()
    @@ -1224,7 +1243,7 @@

    return cs.abortErr
    case <-ctx.Done():
    return ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return errRequestCanceled
    default:
    }
    @@ -1234,14 +1253,14 @@

    // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
    // sent by writeRequestBody below, along with any Trailers,
    // again in form HEADERS{1}, CONTINUATION{0,})
    - trailers, err := commaSeparatedTrailers(cs.req)
    + trailers, err := commaSeparatedTrailers(req)
    if err != nil {
    return err
    }
    hasTrailers := trailers != ""
    - contentLen := actualContentLength(cs.req)
    + contentLen := actualContentLength(req)
    hasBody := contentLen != 0
    - hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
    + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
    if err != nil {
    return err
    }
    @@ -1260,7 +1279,6 @@

    // cleanupWriteRequest will send a reset to the peer.
    func (cs *clientStream) cleanupWriteRequest(err error) {
    cc := cs.cc
    - req := cs.req

    if cs.ID == 0 {
    // We were canceled before creating the stream, so return our reservation.
    @@ -1271,10 +1289,12 @@

    // Request.Body is closed by the Transport,
    // and in multiple cases: server replies <=299 and >299
    // while still writing request body
    - if req.Body != nil {
    - if e := req.Body.Close(); err == nil {
    - err = e
    - }
    + cc.mu.Lock()
    + bodyClosed := cs.reqBodyClosed
    + cs.reqBodyClosed = true
    + cc.mu.Unlock()
    + if !bodyClosed && cs.reqBody != nil {
    + cs.reqBody.Close()
    }

    if err != nil && cs.sentEndStream {
    @@ -1389,7 +1409,7 @@

    if n > max {
    n = max
    }
    - if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
    + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
    // Add an extra byte past the declared content-length to
    // give the caller's Request.Body io.Reader a chance to
    // give us more bytes than they declared, so we can catch it
    @@ -1404,13 +1424,13 @@


    var bufPool sync.Pool // of *[]byte

    -func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
    +func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
    cc := cs.cc
    + body := cs.reqBody
    sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

    - req := cs.req
    hasTrailers := req.Trailer != nil
    - remainLen := actualContentLength(req)
    + remainLen := cs.reqBodyContentLength
    hasContentLen := remainLen != -1

    cc.mu.Lock()
    @@ -1462,12 +1482,7 @@

    for len(remain) > 0 && err == nil {
    var allowed int32
    allowed, err = cs.awaitFlowControl(len(remain))
    - switch {
    - case err == errStopReqBodyWrite:
    - return err
    - case err == errStopReqBodyWriteAndCancel:
    - return err
    - case err != nil:
    + if err != nil {
    return err
    }
    cc.wmu.Lock()
    @@ -1498,16 +1513,26 @@
    @@ -1528,23 +1553,22 @@

    // if the stream is dead.
    func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
    cc := cs.cc
    - req := cs.req
    - ctx := req.Context()
    + ctx := cs.ctx
    cc.mu.Lock()
    defer cc.mu.Unlock()
    for {
    if cc.closed {
    return 0, errClientConnClosed
    }
    - if cs.stopReqBody != nil {
    - return 0, cs.stopReqBody
    + if cs.reqBodyClosed {
    + return 0, errStopReqBodyWrite
    }
    select {
    case <-cs.abort:
    return 0, cs.abortErr
    case <-ctx.Done():
    return 0, ctx.Err()
    - case <-req.Cancel:
    + case <-cs.reqCancel:
    return 0, errRequestCanceled
    default:
    }
    @@ -1758,11 +1782,11 @@

    }

    // requires cc.wmu be held.
    -func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
    +func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
    cc.hbuf.Reset()

    hlSize := uint64(0)
    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    for _, v := range vv {
    hf := hpack.HeaderField{Name: k, Value: v}
    hlSize += uint64(hf.Size())
    @@ -1772,7 +1796,7 @@

    return nil, errRequestHeaderListSize
    }

    - for k, vv := range req.Trailer {
    + for k, vv := range trailer {
    lowKey, ascii := asciiToLower(k)
    if !ascii {
    // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
    @@ -2135,7 +2159,7 @@

    // more safe smuggling-wise to ignore.
    }

    - if cs.req.Method == "HEAD" {
    + if cs.isHead {
    res.Body = noBody
    return res, nil
    }
    @@ -2192,8 +2216,7 @@

    }

    // transportResponseBody is the concrete type of Transport.RoundTrip's
    -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
    -// On Close it sends RST_STREAM if EOF wasn't already seen.
    +// Response.Body. It is an io.ReadCloser.
    type transportResponseBody struct {
    cs *clientStream
    }
    @@ -2276,6 +2299,8 @@

    }
    cc.mu.Unlock()

    + // TODO(dneil): Acquiring this mutex can block indefinitely.
    + // Move flow control return to a goroutine?
    cc.wmu.Lock()
    // Return connection-level flow control.
    if unread > 0 {
    @@ -2290,9 +2315,9 @@


    select {
    case <-cs.donec:
    - case <-cs.req.Context().Done():
    - return cs.req.Context().Err()
    - case <-cs.req.Cancel:
    + case <-cs.ctx.Done():
    + return cs.ctx.Err()
    + case <-cs.reqCancel:
    return errRequestCanceled
    }
    return nil
    @@ -2346,7 +2371,7 @@

    To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: internal-branch.go1.17-vendor
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 357690
    Gerrit-PatchSet: 1
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Attention: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-MessageType: newchange

    Dmitri Shuralyov (Gerrit)

    unread,
    Oct 29, 2021, 12:24:29 PM10/29/21
    to Damien Neil, goph...@pubsubhelper.golang.org, Go Bot, Dmitri Shuralyov, Brad Fitzpatrick, golang-co...@googlegroups.com

    Attention is currently required from: Damien Neil.

    View Change

    1 comment:

    • Patchset:

      • Patch Set #1:

        1 of 9 TryBots failed. […]

        Failure is not due to a deterministic problem introduced in this CL.

    To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: internal-branch.go1.17-vendor
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 357690
    Gerrit-PatchSet: 1
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Dmitri Shuralyov <dmit...@golang.org>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-CC: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Attention: Damien Neil <dn...@google.com>
    Gerrit-Comment-Date: Fri, 29 Oct 2021 16:24:24 +0000

    Dmitri Shuralyov (Gerrit)

    unread,
    Oct 29, 2021, 12:24:51 PM10/29/21
    to Damien Neil, Dmitri Shuralyov, goph...@pubsubhelper.golang.org, golang-co...@googlegroups.com

    Attention is currently required from: Dmitri Shuralyov.

    Dmitri Shuralyov uploaded patch set #3 to the change originally created by Damien Neil.

    View Change

    [internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream


    After RoundTrip returns, closing the Response's Body should
    interrupt any ongoing write of the request body. Close the
    Request's Body to unblock the body writer.

    Take additional care around the use of a Request after
    its Response's Body has been closed. The RoundTripper contract
    permits the caller to modify the request after the Response's
    body has been closed.

    Updates golang/go#49076


    Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
    Trust: Damien Neil <dn...@google.com>
    Run-TryBot: Damien Neil <dn...@google.com>
    Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
    ---
    M http2/transport_test.go
    M http2/transport.go
    2 files changed, 151 insertions(+), 77 deletions(-)

    To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.

    Gerrit-Project: net
    Gerrit-Branch: internal-branch.go1.16-vendor
    Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
    Gerrit-Change-Number: 357095
    Gerrit-PatchSet: 3
    Gerrit-Owner: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Damien Neil <dn...@google.com>
    Gerrit-Reviewer: Dmitri Shuralyov <dmit...@golang.org>
    Gerrit-Reviewer: Go Bot <go...@golang.org>
    Gerrit-CC: Brad Fitzpatrick <brad...@golang.org>
    Gerrit-Attention: Dmitri Shuralyov <dmit...@golang.org>
    Gerrit-MessageType: newpatchset

    Dmitri Shuralyov (Gerrit)

    unread,
    Oct 29, 2021, 12:24:57 PM10/29/21
    to Damien Neil, Dmitri Shuralyov, goph...@pubsubhelper.golang.org, Brad Fitzpatrick, Go Bot, golang-co...@googlegroups.com

    Attention is currently required from: Damien Neil.

    Patch set 3:Code-Review +2

    View Change

      To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.

      Gerrit-Project: net
      Gerrit-Branch: internal-branch.go1.16-vendor
      Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
      Gerrit-Change-Number: 357095
      Gerrit-PatchSet: 3
      Gerrit-Owner: Damien Neil <dn...@google.com>
      Gerrit-Reviewer: Damien Neil <dn...@google.com>
      Gerrit-Reviewer: Dmitri Shuralyov <dmit...@golang.org>
      Gerrit-Reviewer: Go Bot <go...@golang.org>
      Gerrit-CC: Brad Fitzpatrick <brad...@golang.org>
      Gerrit-Attention: Damien Neil <dn...@google.com>
      Gerrit-Comment-Date: Fri, 29 Oct 2021 16:24:53 +0000

      Dmitri Shuralyov (Gerrit)

      unread,
      Oct 29, 2021, 12:24:59 PM10/29/21
      to Damien Neil, goph...@pubsubhelper.golang.org, Dmitri Shuralyov, Go Bot, Brad Fitzpatrick, golang-co...@googlegroups.com

      Attention is currently required from: Damien Neil.

      Patch set 1:Code-Review +2

      View Change

        To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.

        Gerrit-Project: net
        Gerrit-Branch: internal-branch.go1.17-vendor
        Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
        Gerrit-Change-Number: 357690
        Gerrit-PatchSet: 1
        Gerrit-Owner: Damien Neil <dn...@google.com>
        Gerrit-Reviewer: Damien Neil <dn...@google.com>
        Gerrit-Reviewer: Dmitri Shuralyov <dmit...@golang.org>
        Gerrit-Reviewer: Go Bot <go...@golang.org>
        Gerrit-CC: Brad Fitzpatrick <brad...@golang.org>
        Gerrit-Attention: Damien Neil <dn...@google.com>
        Gerrit-Comment-Date: Fri, 29 Oct 2021 16:24:55 +0000

        Dmitri Shuralyov (Gerrit)

        unread,
        Oct 29, 2021, 12:25:09 PM10/29/21
        to Damien Neil, Dmitri Shuralyov, goph...@pubsubhelper.golang.org, golang-...@googlegroups.com, Brad Fitzpatrick, Go Bot, golang-co...@googlegroups.com

        Dmitri Shuralyov submitted this change.

        View Change


        Approvals: Dmitri Shuralyov: Looks good to me, approved Damien Neil: Trusted
        [internal-branch.go1.16-vendor] http2: close the Request's Body when aborting a stream

        After RoundTrip returns, closing the Response's Body should
        interrupt any ongoing write of the request body. Close the
        Request's Body to unblock the body writer.

        Take additional care around the use of a Request after
        its Response's Body has been closed. The RoundTripper contract
        permits the caller to modify the request after the Response's
        body has been closed.

        Updates golang/go#49076

        Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
        Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
        Trust: Damien Neil <dn...@google.com>
        Run-TryBot: Damien Neil <dn...@google.com>
        Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
        Reviewed-on: https://go-review.googlesource.com/c/net/+/357095
        Reviewed-by: Dmitri Shuralyov <dmit...@golang.org>

        ---
        M http2/transport_test.go
        M http2/transport.go
        2 files changed, 153 insertions(+), 77 deletions(-)

        diff --git a/http2/transport.go b/http2/transport.go
        index 054e524..03663b9 100644
        @@ -1002,15 +1012,19 @@
        @@ -1018,7 +1032,7 @@

        return nil
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        }
        @@ -1037,7 +1051,7 @@

        // doesn't, they'll RST_STREAM us soon enough. This is a
        // heuristic to avoid adding knobs to Transport. Hopefully
        // we can keep it.
        - cs.abortRequestBodyWrite(errStopReqBodyWrite)
        + cs.abortRequestBodyWrite()
        }
        res.Request = req
        res.TLS = cc.tlsState
        @@ -1054,8 +1068,11 @@

        waitDone()
        return nil, cs.abortErr
        case <-ctx.Done():
        - return nil, ctx.Err()
        - case <-req.Cancel:
        + err := ctx.Err()
        + cs.abortStream(err)
        + return nil, err
        + case <-cs.reqCancel:
        + cs.abortStream(errRequestCanceled)
        return nil, errRequestCanceled
        }
        }
        @@ -1064,8 +1081,8 @@

        // writeRequest runs for the duration of the request lifetime.
        //
        // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
        -func (cs *clientStream) doRequest() {
        - err := cs.writeRequest()
        +func (cs *clientStream) doRequest(req *http.Request) {
        + err := cs.writeRequest(req)
        cs.cleanupWriteRequest(err)
        }

        @@ -1076,12 +1093,11 @@

        //
        // It returns non-nil if the request ends otherwise.
        // If the returned error is StreamError, the error Code may be used in resetting the stream.
        -func (cs *clientStream) writeRequest() (err error) {
        +func (cs *clientStream) writeRequest(req *http.Request) (err error) {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx

        - if err := checkConnHeaders(cs.req); err != nil {
        + if err := checkConnHeaders(req); err != nil {
        return err
        }

        @@ -1093,7 +1109,7 @@

        }
        select {
        case cc.reqHeaderMu <- struct{}{}:
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        case <-ctx.Done():
        return ctx.Err()
        @@ -1116,7 +1132,7 @@

        if !cc.t.disableCompression() &&
        req.Header.Get("Accept-Encoding") == "" &&
        req.Header.Get("Range") == "" &&
        - req.Method != "HEAD" {
        + !cs.isHead {
        // Request gzip only, not deflate. Deflate is ambiguous and
        // not as universally supported anyway.
        // See: https://zlib.net/zlib_faq.html#faq39
        @@ -1135,19 +1151,23 @@

        continueTimeout := cc.t.expectContinueTimeout()
        if continueTimeout != 0 &&
        !httpguts.HeaderValuesContainsToken(
        - cs.req.Header["Expect"],
        + req.Header["Expect"],
        "100-continue") {
        continueTimeout = 0
        cs.on100 = make(chan struct{}, 1)
        }

        - err = cs.encodeAndWriteHeaders()
        + // Past this point (where we send request headers), it is possible for
        + // RoundTrip to return successfully. Since the RoundTrip contract permits
        + // the caller to "mutate or reuse" the Request after closing the Response's Body,
        + // we must take care when referencing the Request from here on.
        + err = cs.encodeAndWriteHeaders(req)
        <-cc.reqHeaderMu
        if err != nil {
        return err
        }

        - hasBody := actualContentLength(cs.req) != 0
        + hasBody := cs.reqBodyContentLength != 0
        if !hasBody {
        cs.sentEndStream = true
        } else {
        @@ -1163,7 +1183,7 @@

        err = cs.abortErr
        case <-ctx.Done():
        err = ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        err = errRequestCanceled
        }
        timer.Stop()
        @@ -1173,7 +1193,7 @@

        }
        }

        - if err = cs.writeRequestBody(req.Body); err != nil {
        + if err = cs.writeRequestBody(req); err != nil {
        if err != errStopReqBodyWrite {
        traceWroteRequest(cs.trace, err)
        return err
        @@ -1208,16 +1228,15 @@

        return cs.abortErr
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        }
        }

        -func (cs *clientStream) encodeAndWriteHeaders() error {
        +func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx

        cc.wmu.Lock()
        defer cc.wmu.Unlock()
        @@ -1228,7 +1247,7 @@

        return cs.abortErr
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        default:
        }
        @@ -1238,14 +1257,14 @@

        // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
        // sent by writeRequestBody below, along with any Trailers,
        // again in form HEADERS{1}, CONTINUATION{0,})
        - trailers, err := commaSeparatedTrailers(cs.req)
        + trailers, err := commaSeparatedTrailers(req)
        if err != nil {
        return err
        }
        hasTrailers := trailers != ""
        - contentLen := actualContentLength(cs.req)
        + contentLen := actualContentLength(req)
        hasBody := contentLen != 0
        - hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
        + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
        if err != nil {
        return err
        }
        @@ -1264,7 +1283,6 @@

        // cleanupWriteRequest will send a reset to the peer.
        func (cs *clientStream) cleanupWriteRequest(err error) {
        cc := cs.cc
        - req := cs.req

        if cs.ID == 0 {
        // We were canceled before creating the stream, so return our reservation.
        @@ -1275,10 +1293,12 @@

        // Request.Body is closed by the Transport,
        // and in multiple cases: server replies <=299 and >299
        // while still writing request body
        - if req.Body != nil {
        - if e := req.Body.Close(); err == nil {
        - err = e
        - }
        + cc.mu.Lock()
        + bodyClosed := cs.reqBodyClosed
        + cs.reqBodyClosed = true
        + cc.mu.Unlock()
        + if !bodyClosed && cs.reqBody != nil {
        + cs.reqBody.Close()
        }

        if err != nil && cs.sentEndStream {
        @@ -1393,7 +1413,7 @@

        if n > max {
        n = max
        }
        - if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
        + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
        // Add an extra byte past the declared content-length to
        // give the caller's Request.Body io.Reader a chance to
        // give us more bytes than they declared, so we can catch it
        @@ -1408,13 +1428,13 @@


        var bufPool sync.Pool // of *[]byte

        -func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
        +func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
        cc := cs.cc
        + body := cs.reqBody
        sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

        - req := cs.req
        hasTrailers := req.Trailer != nil
        - remainLen := actualContentLength(req)
        + remainLen := cs.reqBodyContentLength
        hasContentLen := remainLen != -1

        cc.mu.Lock()
        @@ -1466,12 +1486,7 @@

        for len(remain) > 0 && err == nil {
        var allowed int32
        allowed, err = cs.awaitFlowControl(len(remain))
        - switch {
        - case err == errStopReqBodyWrite:
        - return err
        - case err == errStopReqBodyWriteAndCancel:
        - return err
        - case err != nil:
        + if err != nil {
        return err
        }
        cc.wmu.Lock()
        @@ -1502,16 +1517,26 @@
        @@ -1532,23 +1557,22 @@

        // if the stream is dead.
        func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx
        cc.mu.Lock()
        defer cc.mu.Unlock()
        for {
        if cc.closed {
        return 0, errClientConnClosed
        }
        - if cs.stopReqBody != nil {
        - return 0, cs.stopReqBody
        + if cs.reqBodyClosed {
        + return 0, errStopReqBodyWrite
        }
        select {
        case <-cs.abort:
        return 0, cs.abortErr
        case <-ctx.Done():
        return 0, ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return 0, errRequestCanceled
        default:
        }
        @@ -1762,11 +1786,11 @@

        }

        // requires cc.wmu be held.
        -func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
        +func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
        cc.hbuf.Reset()

        hlSize := uint64(0)
        - for k, vv := range req.Trailer {
        + for k, vv := range trailer {
        for _, v := range vv {
        hf := hpack.HeaderField{Name: k, Value: v}
        hlSize += uint64(hf.Size())
        @@ -1776,7 +1800,7 @@

        return nil, errRequestHeaderListSize
        }

        - for k, vv := range req.Trailer {
        + for k, vv := range trailer {
        lowKey, ascii := asciiToLower(k)
        if !ascii {
        // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
        @@ -2139,7 +2163,7 @@

        // more safe smuggling-wise to ignore.
        }

        - if cs.req.Method == "HEAD" {
        + if cs.isHead {
        res.Body = noBody
        return res, nil
        }
        @@ -2196,8 +2220,7 @@

        }

        // transportResponseBody is the concrete type of Transport.RoundTrip's
        -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
        -// On Close it sends RST_STREAM if EOF wasn't already seen.
        +// Response.Body. It is an io.ReadCloser.
        type transportResponseBody struct {
        cs *clientStream
        }
        @@ -2280,6 +2303,8 @@

        }
        cc.mu.Unlock()

        + // TODO(dneil): Acquiring this mutex can block indefinitely.
        + // Move flow control return to a goroutine?
        cc.wmu.Lock()
        // Return connection-level flow control.
        if unread > 0 {
        @@ -2294,9 +2319,9 @@


        select {
        case <-cs.donec:
        - case <-cs.req.Context().Done():
        - return cs.req.Context().Err()
        - case <-cs.req.Cancel:
        + case <-cs.ctx.Done():
        + return cs.ctx.Err()
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        return nil
        @@ -2350,7 +2375,7 @@

        To view, visit change 357095. To unsubscribe, or for help writing mail filters, visit settings.

        Gerrit-Project: net
        Gerrit-Branch: internal-branch.go1.16-vendor
        Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
        Gerrit-Change-Number: 357095
        Gerrit-PatchSet: 4
        Gerrit-Owner: Damien Neil <dn...@google.com>
        Gerrit-Reviewer: Damien Neil <dn...@google.com>
        Gerrit-Reviewer: Dmitri Shuralyov <dmit...@golang.org>
        Gerrit-Reviewer: Go Bot <go...@golang.org>
        Gerrit-CC: Brad Fitzpatrick <brad...@golang.org>
        Gerrit-MessageType: merged

        Dmitri Shuralyov (Gerrit)

        unread,
        Oct 29, 2021, 12:25:11 PM10/29/21
        to Damien Neil, Dmitri Shuralyov, goph...@pubsubhelper.golang.org, golang-...@googlegroups.com, Go Bot, Brad Fitzpatrick, golang-co...@googlegroups.com

        Dmitri Shuralyov submitted this change.

        View Change


        Approvals: Dmitri Shuralyov: Looks good to me, approved Damien Neil: Trusted; Run TryBots Objections: Go Bot: TryBots failed
        [internal-branch.go1.17-vendor] http2: close the Request's Body when aborting a stream


        After RoundTrip returns, closing the Response's Body should
        interrupt any ongoing write of the request body. Close the
        Request's Body to unblock the body writer.

        Take additional care around the use of a Request after
        its Response's Body has been closed. The RoundTripper contract
        permits the caller to modify the request after the Response's
        body has been closed.

        Updates golang/go#49077


        Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
        Reviewed-on: https://go-review.googlesource.com/c/net/+/355491
        Trust: Damien Neil <dn...@google.com>
        Run-TryBot: Damien Neil <dn...@google.com>
        Reviewed-by: Brad Fitzpatrick <brad...@golang.org>
        Reviewed-on: https://go-review.googlesource.com/c/net/+/357690

        Reviewed-by: Dmitri Shuralyov <dmit...@golang.org>
        ---
        M http2/transport_test.go
        M http2/transport.go
        2 files changed, 153 insertions(+), 77 deletions(-)

        
        
        diff --git a/http2/transport.go b/http2/transport.go
        index 43029a8..e402650 100644
        @@ -998,15 +1008,19 @@
        @@ -1014,7 +1028,7 @@

        return nil
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        }
        @@ -1033,7 +1047,7 @@

        // doesn't, they'll RST_STREAM us soon enough. This is a
        // heuristic to avoid adding knobs to Transport. Hopefully
        // we can keep it.
        - cs.abortRequestBodyWrite(errStopReqBodyWrite)
        + cs.abortRequestBodyWrite()
        }
        res.Request = req
        res.TLS = cc.tlsState
        @@ -1050,8 +1064,11 @@

        waitDone()
        return nil, cs.abortErr
        case <-ctx.Done():
        - return nil, ctx.Err()
        - case <-req.Cancel:
        + err := ctx.Err()
        + cs.abortStream(err)
        + return nil, err
        + case <-cs.reqCancel:
        + cs.abortStream(errRequestCanceled)
        return nil, errRequestCanceled
        }
        }
        @@ -1060,8 +1077,8 @@

        // writeRequest runs for the duration of the request lifetime.
        //
        // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
        -func (cs *clientStream) doRequest() {
        - err := cs.writeRequest()
        +func (cs *clientStream) doRequest(req *http.Request) {
        + err := cs.writeRequest(req)
        cs.cleanupWriteRequest(err)
        }

        @@ -1072,12 +1089,11 @@

        //
        // It returns non-nil if the request ends otherwise.
        // If the returned error is StreamError, the error Code may be used in resetting the stream.
        -func (cs *clientStream) writeRequest() (err error) {
        +func (cs *clientStream) writeRequest(req *http.Request) (err error) {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx

        - if err := checkConnHeaders(cs.req); err != nil {
        + if err := checkConnHeaders(req); err != nil {
        return err
        }

        @@ -1089,7 +1105,7 @@

        }
        select {
        case cc.reqHeaderMu <- struct{}{}:
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        case <-ctx.Done():
        return ctx.Err()
        @@ -1112,7 +1128,7 @@

        if !cc.t.disableCompression() &&
        req.Header.Get("Accept-Encoding") == "" &&
        req.Header.Get("Range") == "" &&
        - req.Method != "HEAD" {
        + !cs.isHead {
        // Request gzip only, not deflate. Deflate is ambiguous and
        // not as universally supported anyway.
        // See: https://zlib.net/zlib_faq.html#faq39
        @@ -1131,19 +1147,23 @@

        continueTimeout := cc.t.expectContinueTimeout()
        if continueTimeout != 0 &&
        !httpguts.HeaderValuesContainsToken(
        - cs.req.Header["Expect"],
        + req.Header["Expect"],
        "100-continue") {
        continueTimeout = 0
        cs.on100 = make(chan struct{}, 1)
        }

        - err = cs.encodeAndWriteHeaders()
        + // Past this point (where we send request headers), it is possible for
        + // RoundTrip to return successfully. Since the RoundTrip contract permits
        + // the caller to "mutate or reuse" the Request after closing the Response's Body,
        + // we must take care when referencing the Request from here on.
        + err = cs.encodeAndWriteHeaders(req)
        <-cc.reqHeaderMu
        if err != nil {
        return err
        }

        - hasBody := actualContentLength(cs.req) != 0
        + hasBody := cs.reqBodyContentLength != 0
        if !hasBody {
        cs.sentEndStream = true
        } else {
        @@ -1159,7 +1179,7 @@

        err = cs.abortErr
        case <-ctx.Done():
        err = ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        err = errRequestCanceled
        }
        timer.Stop()
        @@ -1169,7 +1189,7 @@

        }
        }

        - if err = cs.writeRequestBody(req.Body); err != nil {
        + if err = cs.writeRequestBody(req); err != nil {
        if err != errStopReqBodyWrite {
        traceWroteRequest(cs.trace, err)
        return err
        @@ -1204,16 +1224,15 @@

        return cs.abortErr
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        }
        }

        -func (cs *clientStream) encodeAndWriteHeaders() error {
        +func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx

        cc.wmu.Lock()
        defer cc.wmu.Unlock()
        @@ -1224,7 +1243,7 @@

        return cs.abortErr
        case <-ctx.Done():
        return ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return errRequestCanceled
        default:
        }
        @@ -1234,14 +1253,14 @@

        // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
        // sent by writeRequestBody below, along with any Trailers,
        // again in form HEADERS{1}, CONTINUATION{0,})
        - trailers, err := commaSeparatedTrailers(cs.req)
        + trailers, err := commaSeparatedTrailers(req)
        if err != nil {
        return err
        }
        hasTrailers := trailers != ""
        - contentLen := actualContentLength(cs.req)
        + contentLen := actualContentLength(req)
        hasBody := contentLen != 0
        - hdrs, err := cc.encodeHeaders(cs.req, cs.requestedGzip, trailers, contentLen)
        + hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
        if err != nil {
        return err
        }
        @@ -1260,7 +1279,6 @@

        // cleanupWriteRequest will send a reset to the peer.
        func (cs *clientStream) cleanupWriteRequest(err error) {
        cc := cs.cc
        - req := cs.req

        if cs.ID == 0 {
        // We were canceled before creating the stream, so return our reservation.
        @@ -1271,10 +1289,12 @@

        // Request.Body is closed by the Transport,
        // and in multiple cases: server replies <=299 and >299
        // while still writing request body
        - if req.Body != nil {
        - if e := req.Body.Close(); err == nil {
        - err = e
        - }
        + cc.mu.Lock()
        + bodyClosed := cs.reqBodyClosed
        + cs.reqBodyClosed = true
        + cc.mu.Unlock()
        + if !bodyClosed && cs.reqBody != nil {
        + cs.reqBody.Close()
        }

        if err != nil && cs.sentEndStream {
        @@ -1389,7 +1409,7 @@

        if n > max {
        n = max
        }
        - if cl := actualContentLength(cs.req); cl != -1 && cl+1 < n {
        + if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
        // Add an extra byte past the declared content-length to
        // give the caller's Request.Body io.Reader a chance to
        // give us more bytes than they declared, so we can catch it
        @@ -1404,13 +1424,13 @@


        var bufPool sync.Pool // of *[]byte

        -func (cs *clientStream) writeRequestBody(body io.Reader) (err error) {
        +func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
        cc := cs.cc
        + body := cs.reqBody
        sentEnd := false // whether we sent the final DATA frame w/ END_STREAM

        - req := cs.req
        hasTrailers := req.Trailer != nil
        - remainLen := actualContentLength(req)
        + remainLen := cs.reqBodyContentLength
        hasContentLen := remainLen != -1

        cc.mu.Lock()
        @@ -1462,12 +1482,7 @@

        for len(remain) > 0 && err == nil {
        var allowed int32
        allowed, err = cs.awaitFlowControl(len(remain))
        - switch {
        - case err == errStopReqBodyWrite:
        - return err
        - case err == errStopReqBodyWriteAndCancel:
        - return err
        - case err != nil:
        + if err != nil {
        return err
        }
        cc.wmu.Lock()
        @@ -1498,16 +1513,26 @@
        @@ -1528,23 +1553,22 @@

        // if the stream is dead.
        func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
        cc := cs.cc
        - req := cs.req
        - ctx := req.Context()
        + ctx := cs.ctx
        cc.mu.Lock()
        defer cc.mu.Unlock()
        for {
        if cc.closed {
        return 0, errClientConnClosed
        }
        - if cs.stopReqBody != nil {
        - return 0, cs.stopReqBody
        + if cs.reqBodyClosed {
        + return 0, errStopReqBodyWrite
        }
        select {
        case <-cs.abort:
        return 0, cs.abortErr
        case <-ctx.Done():
        return 0, ctx.Err()
        - case <-req.Cancel:
        + case <-cs.reqCancel:
        return 0, errRequestCanceled
        default:
        }
        @@ -1758,11 +1782,11 @@

        }

        // requires cc.wmu be held.
        -func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) {
        +func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
        cc.hbuf.Reset()

        hlSize := uint64(0)
        - for k, vv := range req.Trailer {
        + for k, vv := range trailer {
        for _, v := range vv {
        hf := hpack.HeaderField{Name: k, Value: v}
        hlSize += uint64(hf.Size())
        @@ -1772,7 +1796,7 @@

        return nil, errRequestHeaderListSize
        }

        - for k, vv := range req.Trailer {
        + for k, vv := range trailer {
        lowKey, ascii := asciiToLower(k)
        if !ascii {
        // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
        @@ -2135,7 +2159,7 @@

        // more safe smuggling-wise to ignore.
        }

        - if cs.req.Method == "HEAD" {
        + if cs.isHead {
        res.Body = noBody
        return res, nil
        }
        @@ -2192,8 +2216,7 @@

        }

        // transportResponseBody is the concrete type of Transport.RoundTrip's
        -// Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body.
        -// On Close it sends RST_STREAM if EOF wasn't already seen.
        +// Response.Body. It is an io.ReadCloser.
        type transportResponseBody struct {
        cs *clientStream
        }
        @@ -2276,6 +2299,8 @@

        }
        cc.mu.Unlock()

        + // TODO(dneil): Acquiring this mutex can block indefinitely.
        + // Move flow control return to a goroutine?
        cc.wmu.Lock()
        // Return connection-level flow control.
        if unread > 0 {
        @@ -2290,9 +2315,9 @@


        select {
        case <-cs.donec:
        - case <-cs.req.Context().Done():
        - return cs.req.Context().Err()
        - case <-cs.req.Cancel:
        + case <-cs.ctx.Done():
        + return cs.ctx.Err()
        + case <-cs.reqCancel:
        return errRequestCanceled
        }
        return nil
        @@ -2346,7 +2371,7 @@

        To view, visit change 357690. To unsubscribe, or for help writing mail filters, visit settings.

        Gerrit-Project: net
        Gerrit-Branch: internal-branch.go1.17-vendor
        Gerrit-Change-Id: I261e08eb5d70016b49942d72833f46b2ae83962a
        Gerrit-Change-Number: 357690
        Gerrit-PatchSet: 2
        Reply all
        Reply to author
        Forward
        0 new messages