Source file src/net/http/internal/http2/transport.go

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code.
     6  
     7  package http2
     8  
     9  import (
    10  	"bufio"
    11  	"bytes"
    12  	"compress/flate"
    13  	"compress/gzip"
    14  	"context"
    15  	"crypto/rand"
    16  	"crypto/tls"
    17  	"errors"
    18  	"fmt"
    19  	"io"
    20  	"io/fs"
    21  	"log"
    22  	"math"
    23  	"math/bits"
    24  	mathrand "math/rand"
    25  	"net"
    26  	"net/http/httptrace"
    27  	"net/http/internal"
    28  	"net/http/internal/httpcommon"
    29  	"net/textproto"
    30  	"slices"
    31  	"strconv"
    32  	"strings"
    33  	"sync"
    34  	"sync/atomic"
    35  	"time"
    36  
    37  	"golang.org/x/net/http/httpguts"
    38  	"golang.org/x/net/http2/hpack"
    39  	"golang.org/x/net/idna"
    40  )
    41  
    42  const (
    43  	// transportDefaultConnFlow is how many connection-level flow control
    44  	// tokens we give the server at start-up, past the default 64k.
    45  	transportDefaultConnFlow = 1 << 30
    46  
    47  	// transportDefaultStreamFlow is how many stream-level flow
    48  	// control tokens we announce to the peer, and how many bytes
    49  	// we buffer per stream.
    50  	transportDefaultStreamFlow = 4 << 20
    51  
    52  	defaultUserAgent = "Go-http-client/2.0"
    53  
    54  	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
    55  	// it's received servers initial SETTINGS frame, which corresponds with the
    56  	// spec's minimum recommended value.
    57  	initialMaxConcurrentStreams = 100
    58  
    59  	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
    60  	// if the server doesn't include one in its initial SETTINGS frame.
    61  	defaultMaxConcurrentStreams = 1000
    62  )
    63  
    64  // Transport is an HTTP/2 Transport.
    65  //
    66  // A Transport internally caches connections to servers. It is safe
    67  // for concurrent use by multiple goroutines.
    68  type Transport struct {
    69  	t1       TransportConfig
    70  	connPool noDialClientConnPool
    71  	*transportTestHooks
    72  }
    73  
    74  // Hook points used for testing.
    75  // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
    76  // Inside tests, see the testSyncHooks function docs.
    77  
    78  type transportTestHooks struct {
    79  	newclientconn func(*ClientConn)
    80  }
    81  
    82  func (t *Transport) maxHeaderListSize() uint32 {
    83  	n := t.t1.MaxHeaderListSize()
    84  	if b := t.t1.MaxResponseHeaderBytes(); b != 0 {
    85  		n = b
    86  		if n > 0 {
    87  			n = adjustHTTP1MaxHeaderSize(n)
    88  		}
    89  	}
    90  	if n <= 0 {
    91  		return 10 << 20
    92  	}
    93  	if n >= 0xffffffff {
    94  		return 0
    95  	}
    96  	return uint32(n)
    97  }
    98  
    99  func (t *Transport) disableCompression() bool {
   100  	return t.t1 != nil && t.t1.DisableCompression()
   101  }
   102  
   103  func NewTransport(t1 TransportConfig) *Transport {
   104  	connPool := new(clientConnPool)
   105  	t2 := &Transport{
   106  		connPool: noDialClientConnPool{connPool},
   107  		t1:       t1,
   108  	}
   109  	connPool.t = t2
   110  	return t2
   111  }
   112  
   113  func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
   114  	addr := authorityAddr(scheme, authority)
   115  	used, err := t.connPool.addConnIfNeeded(addr, t, c)
   116  	if !used {
   117  		go c.Close()
   118  	}
   119  	return err
   120  }
   121  
   122  // unencryptedTransport is a Transport with a RoundTrip method that
   123  // always permits http:// URLs.
   124  type unencryptedTransport Transport
   125  
   126  func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   127  	return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{})
   128  }
   129  
   130  // ClientConn is the state of a single HTTP/2 client connection to an
   131  // HTTP/2 server.
   132  type ClientConn struct {
   133  	t             *Transport
   134  	tconn         net.Conn             // usually *tls.Conn, except specialized impls
   135  	tlsState      *tls.ConnectionState // nil only for specialized impls
   136  	atomicReused  uint32               // whether conn is being reused; atomic
   137  	singleUse     bool                 // whether being used for a single http.Request
   138  	getConnCalled bool                 // used by clientConnPool
   139  
   140  	// readLoop goroutine fields:
   141  	readerDone chan struct{} // closed on error
   142  	readerErr  error         // set before readerDone is closed
   143  
   144  	idleTimeout time.Duration // or 0 for never
   145  	idleTimer   *time.Timer
   146  
   147  	mu               sync.Mutex // guards following
   148  	cond             *sync.Cond // hold mu; broadcast on flow/closed changes
   149  	flow             outflow    // our conn-level flow control quota (cs.outflow is per stream)
   150  	inflow           inflow     // peer's conn-level flow control
   151  	doNotReuse       bool       // whether conn is marked to not be reused for any future requests
   152  	closing          bool
   153  	closed           bool
   154  	closedOnIdle     bool                     // true if conn was closed for idleness
   155  	seenSettings     bool                     // true if we've seen a settings frame, false otherwise
   156  	seenSettingsChan chan struct{}            // closed when seenSettings is true or frame reading fails
   157  	wantSettingsAck  bool                     // we sent a SETTINGS frame and haven't heard back
   158  	goAway           *GoAwayFrame             // if non-nil, the GoAwayFrame we received
   159  	goAwayDebug      string                   // goAway frame's debug data, retained as a string
   160  	streams          map[uint32]*clientStream // client-initiated
   161  	streamsReserved  int                      // incr by ReserveNewRequest; decr on RoundTrip
   162  	nextStreamID     uint32
   163  	pendingRequests  int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
   164  	pings            map[[8]byte]chan struct{} // in flight ping data to notification channel
   165  	br               *bufio.Reader
   166  	lastActive       time.Time
   167  	lastIdle         time.Time // time last idle
   168  	// Settings from peer: (also guarded by wmu)
   169  	maxFrameSize                uint32
   170  	maxConcurrentStreams        uint32
   171  	peerMaxHeaderListSize       uint64
   172  	peerMaxHeaderTableSize      uint32
   173  	initialWindowSize           uint32
   174  	initialStreamRecvWindowSize int32
   175  	readIdleTimeout             time.Duration
   176  	pingTimeout                 time.Duration
   177  	extendedConnectAllowed      bool
   178  	strictMaxConcurrentStreams  bool
   179  
   180  	// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
   181  	// gRPC strictly limits the number of PING frames that it will receive.
   182  	// The default is two pings per two hours, but the limit resets every time
   183  	// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
   184  	//
   185  	// rstStreamPingsBlocked is set after receiving a response to a PING frame
   186  	// bundled with an RST_STREAM (see pendingResets below), and cleared after
   187  	// receiving a HEADERS or DATA frame.
   188  	rstStreamPingsBlocked bool
   189  
   190  	// pendingResets is the number of RST_STREAM frames we have sent to the peer,
   191  	// without confirming that the peer has received them. When we send a RST_STREAM,
   192  	// we bundle it with a PING frame, unless a PING is already in flight. We count
   193  	// the reset stream against the connection's concurrency limit until we get
   194  	// a PING response. This limits the number of requests we'll try to send to a
   195  	// completely unresponsive connection.
   196  	pendingResets int
   197  
   198  	// readBeforeStreamID is the smallest stream ID that has not been followed by
   199  	// a frame read from the peer. We use this to determine when a request may
   200  	// have been sent to a completely unresponsive connection:
   201  	// If the request ID is less than readBeforeStreamID, then we have had some
   202  	// indication of life on the connection since sending the request.
   203  	readBeforeStreamID uint32
   204  
   205  	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
   206  	// Write to reqHeaderMu to lock it, read from it to unlock.
   207  	// Lock reqmu BEFORE mu or wmu.
   208  	reqHeaderMu chan struct{}
   209  
   210  	// internalStateHook reports state changes back to the net/http.ClientConn.
   211  	// Note that this is different from the user state hook registered by
   212  	// net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
   213  	// which calls the user hook.
   214  	internalStateHook func()
   215  
   216  	// wmu is held while writing.
   217  	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
   218  	// Only acquire both at the same time when changing peer settings.
   219  	wmu  sync.Mutex
   220  	bw   *bufio.Writer
   221  	fr   *Framer
   222  	werr error        // first write error that has occurred
   223  	hbuf bytes.Buffer // HPACK encoder writes into this
   224  	henc *hpack.Encoder
   225  }
   226  
   227  // clientStream is the state for a single HTTP/2 stream. One of these
   228  // is created for each Transport.RoundTrip call.
   229  type clientStream struct {
   230  	cc *ClientConn
   231  
   232  	// Fields of Request that we may access even after the response body is closed.
   233  	ctx       context.Context
   234  	reqCancel <-chan struct{}
   235  
   236  	trace         *httptrace.ClientTrace // or nil
   237  	ID            uint32
   238  	bufPipe       pipe // buffered pipe with the flow-controlled response payload
   239  	requestedGzip bool
   240  	isHead        bool
   241  
   242  	abortOnce sync.Once
   243  	abort     chan struct{} // closed to signal stream should end immediately
   244  	abortErr  error         // set if abort is closed
   245  
   246  	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
   247  	donec      chan struct{} // closed after the stream is in the closed state
   248  	on100      chan struct{} // buffered; written to if a 100 is received
   249  
   250  	respHeaderRecv chan struct{}   // closed when headers are received
   251  	res            *ClientResponse // set if respHeaderRecv is closed
   252  
   253  	flow        outflow // guarded by cc.mu
   254  	inflow      inflow  // guarded by cc.mu
   255  	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
   256  	readErr     error   // sticky read error; owned by transportResponseBody.Read
   257  
   258  	reqBody              io.ReadCloser
   259  	reqBodyContentLength int64         // -1 means unknown
   260  	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
   261  
   262  	// owned by writeRequest:
   263  	sentEndStream bool // sent an END_STREAM flag to the peer
   264  	sentHeaders   bool
   265  
   266  	// owned by clientConnReadLoop:
   267  	firstByte       bool  // got the first response byte
   268  	pastHeaders     bool  // got first MetaHeadersFrame (actual headers)
   269  	pastTrailers    bool  // got optional second MetaHeadersFrame (trailers)
   270  	readClosed      bool  // peer sent an END_STREAM flag
   271  	readAborted     bool  // read loop reset the stream
   272  	totalHeaderSize int64 // total size of 1xx headers seen
   273  
   274  	trailer    Header  // accumulated trailers
   275  	resTrailer *Header // client's Response.Trailer
   276  
   277  	staticResp ClientResponse
   278  }
   279  
   280  var got1xxFuncForTests func(int, textproto.MIMEHeader) error
   281  
   282  // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
   283  // if any. It returns nil if not set or if the Go version is too old.
   284  func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
   285  	if fn := got1xxFuncForTests; fn != nil {
   286  		return fn
   287  	}
   288  	return traceGot1xxResponseFunc(cs.trace)
   289  }
   290  
   291  func (cs *clientStream) abortStream(err error) {
   292  	cs.cc.mu.Lock()
   293  	defer cs.cc.mu.Unlock()
   294  	cs.abortStreamLocked(err)
   295  }
   296  
   297  func (cs *clientStream) abortStreamLocked(err error) {
   298  	cs.abortOnce.Do(func() {
   299  		cs.abortErr = err
   300  		close(cs.abort)
   301  	})
   302  	if cs.reqBody != nil {
   303  		cs.closeReqBodyLocked()
   304  	}
   305  	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
   306  	if cs.cc.cond != nil {
   307  		// Wake up writeRequestBody if it is waiting on flow control.
   308  		cs.cc.cond.Broadcast()
   309  	}
   310  }
   311  
   312  func (cs *clientStream) abortRequestBodyWrite() {
   313  	cc := cs.cc
   314  	cc.mu.Lock()
   315  	defer cc.mu.Unlock()
   316  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
   317  		cs.closeReqBodyLocked()
   318  		cc.cond.Broadcast()
   319  	}
   320  }
   321  
   322  func (cs *clientStream) closeReqBodyLocked() {
   323  	if cs.reqBodyClosed != nil {
   324  		return
   325  	}
   326  	cs.reqBodyClosed = make(chan struct{})
   327  	reqBodyClosed := cs.reqBodyClosed
   328  	go func() {
   329  		cs.reqBody.Close()
   330  		close(reqBodyClosed)
   331  	}()
   332  }
   333  
   334  type stickyErrWriter struct {
   335  	conn    net.Conn
   336  	timeout time.Duration
   337  	err     *error
   338  }
   339  
   340  func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
   341  	if *sew.err != nil {
   342  		return 0, *sew.err
   343  	}
   344  	n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
   345  	*sew.err = err
   346  	return n, err
   347  }
   348  
   349  // noCachedConnError is the concrete type of ErrNoCachedConn, which
   350  // needs to be detected by net/http regardless of whether it's its
   351  // bundled version (in h2_bundle.go with a rewritten type name) or
   352  // from a user's x/net/http2. As such, as it has a unique method name
   353  // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
   354  // isNoCachedConnError.
   355  type noCachedConnError struct{}
   356  
   357  func (noCachedConnError) IsHTTP2NoCachedConnError() {}
   358  func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
   359  
   360  // isNoCachedConnError reports whether err is of type noCachedConnError
   361  // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
   362  // may coexist in the same running program.
   363  func isNoCachedConnError(err error) bool {
   364  	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
   365  	return ok
   366  }
   367  
   368  var ErrNoCachedConn error = noCachedConnError{}
   369  
   370  // RoundTripOpt are options for the Transport.RoundTripOpt method.
   371  type RoundTripOpt struct {
   372  	// OnlyCachedConn controls whether RoundTripOpt may
   373  	// create a new TCP connection. If set true and
   374  	// no cached connection is available, RoundTripOpt
   375  	// will return ErrNoCachedConn.
   376  	OnlyCachedConn bool
   377  }
   378  
   379  func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
   380  	return t.RoundTripOpt(req, RoundTripOpt{})
   381  }
   382  
   383  // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
   384  // and returns a host:port. The port 443 is added if needed.
   385  func authorityAddr(scheme string, authority string) (addr string) {
   386  	host, port, err := net.SplitHostPort(authority)
   387  	if err != nil { // authority didn't have a port
   388  		host = authority
   389  		port = ""
   390  	}
   391  	if port == "" { // authority's port was empty
   392  		port = "443"
   393  		if scheme == "http" {
   394  			port = "80"
   395  		}
   396  	}
   397  	if a, err := idna.ToASCII(host); err == nil {
   398  		host = a
   399  	}
   400  	// IPv6 address literal, without a port:
   401  	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
   402  		return host + ":" + port
   403  	}
   404  	return net.JoinHostPort(host, port)
   405  }
   406  
   407  // RoundTripOpt is like RoundTrip, but takes options.
   408  func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
   409  	switch req.URL.Scheme {
   410  	case "https":
   411  	case "http":
   412  	default:
   413  		return nil, errors.New("http2: unsupported scheme")
   414  	}
   415  
   416  	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
   417  	for retry := 0; ; retry++ {
   418  		cc, err := t.connPool.GetClientConn(req, addr)
   419  		if err != nil {
   420  			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
   421  			return nil, err
   422  		}
   423  		reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
   424  		traceGotConn(req, cc, reused)
   425  		res, err := cc.RoundTrip(req)
   426  		if err != nil && retry <= 6 {
   427  			roundTripErr := err
   428  			if req, err = shouldRetryRequest(req, err); err == nil {
   429  				// After the first retry, do exponential backoff with 10% jitter.
   430  				if retry == 0 {
   431  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   432  					continue
   433  				}
   434  				backoff := float64(uint(1) << (uint(retry) - 1))
   435  				backoff += backoff * (0.1 * mathrand.Float64())
   436  				d := time.Second * time.Duration(backoff)
   437  				tm := time.NewTimer(d)
   438  				select {
   439  				case <-tm.C:
   440  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   441  					continue
   442  				case <-req.Context.Done():
   443  					tm.Stop()
   444  					err = req.Context.Err()
   445  				}
   446  			}
   447  		}
   448  		if err == errClientConnNotEstablished {
   449  			// This ClientConn was created recently,
   450  			// this is the first request to use it,
   451  			// and the connection is closed and not usable.
   452  			//
   453  			// In this state, cc.idleTimer will remove the conn from the pool
   454  			// when it fires. Stop the timer and remove it here so future requests
   455  			// won't try to use this connection.
   456  			//
   457  			// If the timer has already fired and we're racing it, the redundant
   458  			// call to MarkDead is harmless.
   459  			if cc.idleTimer != nil {
   460  				cc.idleTimer.Stop()
   461  			}
   462  			t.connPool.MarkDead(cc)
   463  		}
   464  		if err != nil {
   465  			t.vlogf("RoundTrip failure: %v", err)
   466  			return nil, err
   467  		}
   468  		return res, nil
   469  	}
   470  }
   471  
   472  func (t *Transport) IdleConnStrsForTesting() []string {
   473  	var ret []string
   474  	t.connPool.mu.Lock()
   475  	defer t.connPool.mu.Unlock()
   476  	for k, ccs := range t.connPool.conns {
   477  		for _, cc := range ccs {
   478  			if cc.idleState().canTakeNewRequest {
   479  				ret = append(ret, k)
   480  			}
   481  		}
   482  	}
   483  	slices.Sort(ret)
   484  	return ret
   485  }
   486  
   487  // CloseIdleConnections closes any connections which were previously
   488  // connected from previous requests but are now sitting idle.
   489  // It does not interrupt any connections currently in use.
   490  func (t *Transport) CloseIdleConnections() {
   491  	t.connPool.closeIdleConnections()
   492  }
   493  
   494  var (
   495  	errClientConnClosed         = errors.New("http2: client conn is closed")
   496  	errClientConnUnusable       = errors.New("http2: client conn not usable")
   497  	errClientConnNotEstablished = errors.New("http2: client conn could not be established")
   498  	errClientConnGotGoAway      = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
   499  	errClientConnForceClosed    = errors.New("http2: client connection force closed via ClientConn.Close")
   500  )
   501  
   502  // shouldRetryRequest is called by RoundTrip when a request fails to get
   503  // response headers. It is always called with a non-nil error.
   504  // It returns either a request to retry or an error if the request can't be replayed.
   505  // If the request is retried, it always clones the request (since requests
   506  // contain an unreusable clientStream).
   507  func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
   508  	if !canRetryError(err) {
   509  		return nil, err
   510  	}
   511  	// If the Body is nil (or http.NoBody), it's safe to reuse this request's Body.
   512  	if req.Body == nil || req.Body == NoBody {
   513  		return req.Clone(), nil
   514  	}
   515  
   516  	// If the request body can be reset back to its original
   517  	// state via the optional req.GetBody, do that.
   518  	if req.GetBody != nil {
   519  		body, err := req.GetBody()
   520  		if err != nil {
   521  			return nil, err
   522  		}
   523  		newReq := req.Clone()
   524  		newReq.Body = body
   525  		return newReq, nil
   526  	}
   527  
   528  	// The Request.Body can't reset back to the beginning, but we
   529  	// don't seem to have started to read from it yet, so reuse the body.
   530  	if err == errClientConnUnusable {
   531  		return req.Clone(), nil
   532  	}
   533  
   534  	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
   535  }
   536  
   537  func canRetryError(err error) bool {
   538  	if err == errClientConnUnusable || err == errClientConnGotGoAway {
   539  		return true
   540  	}
   541  	if se, ok := err.(StreamError); ok {
   542  		return se.Code == ErrCodeRefusedStream
   543  	}
   544  	return false
   545  }
   546  
   547  func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
   548  	if t.transportTestHooks != nil {
   549  		return t.newClientConn(nil, singleUse, nil)
   550  	}
   551  	host, _, err := net.SplitHostPort(addr)
   552  	if err != nil {
   553  		return nil, err
   554  	}
   555  	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
   556  	if err != nil {
   557  		return nil, err
   558  	}
   559  	return t.newClientConn(tconn, singleUse, nil)
   560  }
   561  
   562  func (t *Transport) newTLSConfig(host string) *tls.Config {
   563  	cfg := new(tls.Config)
   564  	if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
   565  		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
   566  	}
   567  	if cfg.ServerName == "" {
   568  		cfg.ServerName = host
   569  	}
   570  	return cfg
   571  }
   572  
   573  func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
   574  	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
   575  	if err != nil {
   576  		return nil, err
   577  	}
   578  	state := tlsCn.ConnectionState()
   579  	if p := state.NegotiatedProtocol; p != NextProtoTLS {
   580  		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
   581  	}
   582  	if !state.NegotiatedProtocolIsMutual {
   583  		return nil, errors.New("http2: could not negotiate protocol mutually")
   584  	}
   585  	return tlsCn, nil
   586  }
   587  
   588  // disableKeepAlives reports whether connections should be closed as
   589  // soon as possible after handling the first request.
   590  func (t *Transport) disableKeepAlives() bool {
   591  	return t.t1 != nil && t.t1.DisableKeepAlives()
   592  }
   593  
   594  func (t *Transport) expectContinueTimeout() time.Duration {
   595  	if t.t1 == nil {
   596  		return 0
   597  	}
   598  	return t.t1.ExpectContinueTimeout()
   599  }
   600  
   601  func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
   602  	cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
   603  	if err != nil {
   604  		return NetHTTPClientConn{}, err
   605  	}
   606  
   607  	// RoundTrip should block when the conn is at its concurrency limit,
   608  	// not return an error. Setting strictMaxConcurrentStreams enables this.
   609  	cc.strictMaxConcurrentStreams = true
   610  
   611  	return NetHTTPClientConn{cc}, nil
   612  }
   613  
   614  func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
   615  	conf := configFromTransport(t)
   616  	cc := &ClientConn{
   617  		t:                           t,
   618  		tconn:                       c,
   619  		readerDone:                  make(chan struct{}),
   620  		nextStreamID:                1,
   621  		maxFrameSize:                16 << 10, // spec default
   622  		initialWindowSize:           65535,    // spec default
   623  		initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
   624  		maxConcurrentStreams:        initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
   625  		strictMaxConcurrentStreams:  conf.StrictMaxConcurrentRequests,
   626  		peerMaxHeaderListSize:       0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
   627  		streams:                     make(map[uint32]*clientStream),
   628  		singleUse:                   singleUse,
   629  		seenSettingsChan:            make(chan struct{}),
   630  		wantSettingsAck:             true,
   631  		readIdleTimeout:             conf.SendPingTimeout,
   632  		pingTimeout:                 conf.PingTimeout,
   633  		pings:                       make(map[[8]byte]chan struct{}),
   634  		reqHeaderMu:                 make(chan struct{}, 1),
   635  		lastActive:                  time.Now(),
   636  		internalStateHook:           internalStateHook,
   637  	}
   638  	if t.transportTestHooks != nil {
   639  		t.transportTestHooks.newclientconn(cc)
   640  		c = cc.tconn
   641  	}
   642  	if VerboseLogs {
   643  		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
   644  	}
   645  
   646  	cc.cond = sync.NewCond(&cc.mu)
   647  	cc.flow.add(int32(initialWindowSize))
   648  
   649  	// TODO: adjust this writer size to account for frame size +
   650  	// MTU + crypto/tls record padding.
   651  	cc.bw = bufio.NewWriter(stickyErrWriter{
   652  		conn:    c,
   653  		timeout: conf.WriteByteTimeout,
   654  		err:     &cc.werr,
   655  	})
   656  	cc.br = bufio.NewReader(c)
   657  	cc.fr = NewFramer(cc.bw, cc.br)
   658  	cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
   659  	if conf.CountError != nil {
   660  		cc.fr.countError = conf.CountError
   661  	}
   662  	maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
   663  	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
   664  	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
   665  
   666  	cc.henc = hpack.NewEncoder(&cc.hbuf)
   667  	cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
   668  	cc.peerMaxHeaderTableSize = initialHeaderTableSize
   669  
   670  	if cs, ok := c.(connectionStater); ok {
   671  		state := cs.ConnectionState()
   672  		cc.tlsState = &state
   673  	}
   674  
   675  	initialSettings := []Setting{
   676  		{ID: SettingEnablePush, Val: 0},
   677  		{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
   678  	}
   679  	initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
   680  	if max := t.maxHeaderListSize(); max != 0 {
   681  		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
   682  	}
   683  	if maxHeaderTableSize != initialHeaderTableSize {
   684  		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
   685  	}
   686  
   687  	cc.bw.Write(clientPreface)
   688  	cc.fr.WriteSettings(initialSettings...)
   689  	cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
   690  	cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
   691  	cc.bw.Flush()
   692  	if cc.werr != nil {
   693  		cc.Close()
   694  		return nil, cc.werr
   695  	}
   696  
   697  	// Start the idle timer after the connection is fully initialized.
   698  	if d := t.idleConnTimeout(); d != 0 {
   699  		cc.idleTimeout = d
   700  		cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
   701  	}
   702  
   703  	go cc.readLoop()
   704  	return cc, nil
   705  }
   706  
   707  func (cc *ClientConn) healthCheck() {
   708  	pingTimeout := cc.pingTimeout
   709  	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
   710  	// trigger the healthCheck again if there is no frame received.
   711  	ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
   712  	defer cancel()
   713  	cc.vlogf("http2: Transport sending health check")
   714  	err := cc.Ping(ctx)
   715  	if err != nil {
   716  		cc.vlogf("http2: Transport health check failure: %v", err)
   717  		cc.closeForLostPing()
   718  	} else {
   719  		cc.vlogf("http2: Transport health check success")
   720  	}
   721  }
   722  
   723  // SetDoNotReuse marks cc as not reusable for future HTTP requests.
   724  func (cc *ClientConn) SetDoNotReuse() {
   725  	cc.mu.Lock()
   726  	defer cc.mu.Unlock()
   727  	cc.doNotReuse = true
   728  }
   729  
   730  func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
   731  	cc.mu.Lock()
   732  	defer cc.mu.Unlock()
   733  
   734  	old := cc.goAway
   735  	cc.goAway = f
   736  
   737  	// Merge the previous and current GoAway error frames.
   738  	if cc.goAwayDebug == "" {
   739  		cc.goAwayDebug = string(f.DebugData())
   740  	}
   741  	if old != nil && old.ErrCode != ErrCodeNo {
   742  		cc.goAway.ErrCode = old.ErrCode
   743  	}
   744  	last := f.LastStreamID
   745  	for streamID, cs := range cc.streams {
   746  		if streamID <= last {
   747  			// The server's GOAWAY indicates that it received this stream.
   748  			// It will either finish processing it, or close the connection
   749  			// without doing so. Either way, leave the stream alone for now.
   750  			continue
   751  		}
   752  		if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
   753  			// Don't retry the first stream on a connection if we get a non-NO error.
   754  			// If the server is sending an error on a new connection,
   755  			// retrying the request on a new one probably isn't going to work.
   756  			cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
   757  		} else {
   758  			// Aborting the stream with errClentConnGotGoAway indicates that
   759  			// the request should be retried on a new connection.
   760  			cs.abortStreamLocked(errClientConnGotGoAway)
   761  		}
   762  	}
   763  }
   764  
   765  // CanTakeNewRequest reports whether the connection can take a new request,
   766  // meaning it has not been closed or received or sent a GOAWAY.
   767  //
   768  // If the caller is going to immediately make a new request on this
   769  // connection, use ReserveNewRequest instead.
   770  func (cc *ClientConn) CanTakeNewRequest() bool {
   771  	cc.mu.Lock()
   772  	defer cc.mu.Unlock()
   773  	return cc.canTakeNewRequestLocked()
   774  }
   775  
   776  // ReserveNewRequest is like CanTakeNewRequest but also reserves a
   777  // concurrent stream in cc. The reservation is decremented on the
   778  // next call to RoundTrip.
   779  func (cc *ClientConn) ReserveNewRequest() bool {
   780  	cc.mu.Lock()
   781  	defer cc.mu.Unlock()
   782  	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
   783  		return false
   784  	}
   785  	cc.streamsReserved++
   786  	return true
   787  }
   788  
   789  // ClientConnState describes the state of a ClientConn.
   790  type ClientConnState struct {
   791  	// Closed is whether the connection is closed.
   792  	Closed bool
   793  
   794  	// Closing is whether the connection is in the process of
   795  	// closing. It may be closing due to shutdown, being a
   796  	// single-use connection, being marked as DoNotReuse, or
   797  	// having received a GOAWAY frame.
   798  	Closing bool
   799  
   800  	// StreamsActive is how many streams are active.
   801  	StreamsActive int
   802  
   803  	// StreamsReserved is how many streams have been reserved via
   804  	// ClientConn.ReserveNewRequest.
   805  	StreamsReserved int
   806  
   807  	// StreamsPending is how many requests have been sent in excess
   808  	// of the peer's advertised MaxConcurrentStreams setting and
   809  	// are waiting for other streams to complete.
   810  	StreamsPending int
   811  
   812  	// MaxConcurrentStreams is how many concurrent streams the
   813  	// peer advertised as acceptable. Zero means no SETTINGS
   814  	// frame has been received yet.
   815  	MaxConcurrentStreams uint32
   816  
   817  	// LastIdle, if non-zero, is when the connection last
   818  	// transitioned to idle state.
   819  	LastIdle time.Time
   820  }
   821  
   822  // State returns a snapshot of cc's state.
   823  func (cc *ClientConn) State() ClientConnState {
   824  	cc.wmu.Lock()
   825  	maxConcurrent := cc.maxConcurrentStreams
   826  	if !cc.seenSettings {
   827  		maxConcurrent = 0
   828  	}
   829  	cc.wmu.Unlock()
   830  
   831  	cc.mu.Lock()
   832  	defer cc.mu.Unlock()
   833  	return ClientConnState{
   834  		Closed:               cc.closed,
   835  		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
   836  		StreamsActive:        len(cc.streams) + cc.pendingResets,
   837  		StreamsReserved:      cc.streamsReserved,
   838  		StreamsPending:       cc.pendingRequests,
   839  		LastIdle:             cc.lastIdle,
   840  		MaxConcurrentStreams: maxConcurrent,
   841  	}
   842  }
   843  
   844  // clientConnIdleState describes the suitability of a client
   845  // connection to initiate a new RoundTrip request.
   846  type clientConnIdleState struct {
   847  	canTakeNewRequest bool
   848  }
   849  
   850  func (cc *ClientConn) idleState() clientConnIdleState {
   851  	cc.mu.Lock()
   852  	defer cc.mu.Unlock()
   853  	return cc.idleStateLocked()
   854  }
   855  
   856  func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
   857  	if cc.singleUse && cc.nextStreamID > 1 {
   858  		return
   859  	}
   860  	var maxConcurrentOkay bool
   861  	if cc.strictMaxConcurrentStreams {
   862  		// We'll tell the caller we can take a new request to
   863  		// prevent the caller from dialing a new TCP
   864  		// connection, but then we'll block later before
   865  		// writing it.
   866  		maxConcurrentOkay = true
   867  	} else {
   868  		// We can take a new request if the total of
   869  		//   - active streams;
   870  		//   - reservation slots for new streams; and
   871  		//   - streams for which we have sent a RST_STREAM and a PING,
   872  		//     but received no subsequent frame
   873  		// is less than the concurrency limit.
   874  		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
   875  	}
   876  
   877  	st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
   878  
   879  	// If this connection has never been used for a request and is closed,
   880  	// then let it take a request (which will fail).
   881  	// If the conn was closed for idleness, we're racing the idle timer;
   882  	// don't try to use the conn. (Issue #70515.)
   883  	//
   884  	// This avoids a situation where an error early in a connection's lifetime
   885  	// goes unreported.
   886  	if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
   887  		st.canTakeNewRequest = true
   888  	}
   889  
   890  	return
   891  }
   892  
   893  func (cc *ClientConn) isUsableLocked() bool {
   894  	return cc.goAway == nil &&
   895  		!cc.closed &&
   896  		!cc.closing &&
   897  		!cc.doNotReuse &&
   898  		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
   899  		!cc.tooIdleLocked()
   900  }
   901  
   902  // canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
   903  //
   904  // This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
   905  // We only permit reservations up to the conn's concurrency limit.
   906  // This differs from ClientConn.ReserveNewRequest, which permits reservations
   907  // past the limit when StrictMaxConcurrentStreams is set.
   908  func (cc *ClientConn) canReserveLocked() bool {
   909  	if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
   910  		return false
   911  	}
   912  	if !cc.isUsableLocked() {
   913  		return false
   914  	}
   915  	return true
   916  }
   917  
   918  // currentRequestCountLocked reports the number of concurrency slots currently in use,
   919  // including active streams, reserved slots, and reset streams waiting for acknowledgement.
   920  func (cc *ClientConn) currentRequestCountLocked() int {
   921  	return len(cc.streams) + cc.streamsReserved + cc.pendingResets
   922  }
   923  
   924  func (cc *ClientConn) canTakeNewRequestLocked() bool {
   925  	st := cc.idleStateLocked()
   926  	return st.canTakeNewRequest
   927  }
   928  
   929  // availableLocked reports the number of concurrency slots available.
   930  func (cc *ClientConn) availableLocked() int {
   931  	if !cc.canTakeNewRequestLocked() {
   932  		return 0
   933  	}
   934  	return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
   935  }
   936  
   937  // tooIdleLocked reports whether this connection has been been sitting idle
   938  // for too much wall time.
   939  func (cc *ClientConn) tooIdleLocked() bool {
   940  	// The Round(0) strips the monontonic clock reading so the
   941  	// times are compared based on their wall time. We don't want
   942  	// to reuse a connection that's been sitting idle during
   943  	// VM/laptop suspend if monotonic time was also frozen.
   944  	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
   945  }
   946  
   947  // onIdleTimeout is called from a time.AfterFunc goroutine. It will
   948  // only be called when we're idle, but because we're coming from a new
   949  // goroutine, there could be a new request coming in at the same time,
   950  // so this simply calls the synchronized closeIfIdle to shut down this
   951  // connection. The timer could just call closeIfIdle, but this is more
   952  // clear.
   953  func (cc *ClientConn) onIdleTimeout() {
   954  	cc.closeIfIdle()
   955  }
   956  
   957  func (cc *ClientConn) closeConn() {
   958  	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
   959  	defer t.Stop()
   960  	cc.tconn.Close()
   961  	cc.maybeCallStateHook()
   962  }
   963  
   964  // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
   965  // Try to shut it down more aggressively.
   966  func (cc *ClientConn) forceCloseConn() {
   967  	tc, ok := cc.tconn.(*tls.Conn)
   968  	if !ok {
   969  		return
   970  	}
   971  	if nc := tc.NetConn(); nc != nil {
   972  		nc.Close()
   973  	}
   974  }
   975  
   976  func (cc *ClientConn) closeIfIdle() {
   977  	cc.mu.Lock()
   978  	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
   979  		cc.mu.Unlock()
   980  		return
   981  	}
   982  	cc.closed = true
   983  	cc.closedOnIdle = true
   984  	nextID := cc.nextStreamID
   985  	// TODO: do clients send GOAWAY too? maybe? Just Close:
   986  	cc.mu.Unlock()
   987  
   988  	if VerboseLogs {
   989  		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
   990  	}
   991  	cc.closeConn()
   992  }
   993  
   994  func (cc *ClientConn) isDoNotReuseAndIdle() bool {
   995  	cc.mu.Lock()
   996  	defer cc.mu.Unlock()
   997  	return cc.doNotReuse && len(cc.streams) == 0
   998  }
   999  
  1000  var shutdownEnterWaitStateHook = func() {}
  1001  
  1002  // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1003  func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1004  	if err := cc.sendGoAway(); err != nil {
  1005  		return err
  1006  	}
  1007  	// Wait for all in-flight streams to complete or connection to close
  1008  	done := make(chan struct{})
  1009  	cancelled := false // guarded by cc.mu
  1010  	go func() {
  1011  		cc.mu.Lock()
  1012  		defer cc.mu.Unlock()
  1013  		for {
  1014  			if len(cc.streams) == 0 || cc.closed {
  1015  				cc.closed = true
  1016  				close(done)
  1017  				break
  1018  			}
  1019  			if cancelled {
  1020  				break
  1021  			}
  1022  			cc.cond.Wait()
  1023  		}
  1024  	}()
  1025  	shutdownEnterWaitStateHook()
  1026  	select {
  1027  	case <-done:
  1028  		cc.closeConn()
  1029  		return nil
  1030  	case <-ctx.Done():
  1031  		cc.mu.Lock()
  1032  		// Free the goroutine above
  1033  		cancelled = true
  1034  		cc.cond.Broadcast()
  1035  		cc.mu.Unlock()
  1036  		return ctx.Err()
  1037  	}
  1038  }
  1039  
  1040  func (cc *ClientConn) sendGoAway() error {
  1041  	cc.mu.Lock()
  1042  	closing := cc.closing
  1043  	cc.closing = true
  1044  	maxStreamID := cc.nextStreamID
  1045  	cc.mu.Unlock()
  1046  	if closing {
  1047  		// GOAWAY sent already
  1048  		return nil
  1049  	}
  1050  
  1051  	cc.wmu.Lock()
  1052  	defer cc.wmu.Unlock()
  1053  	// Send a graceful shutdown frame to server
  1054  	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1055  		return err
  1056  	}
  1057  	if err := cc.bw.Flush(); err != nil {
  1058  		return err
  1059  	}
  1060  	// Prevent new requests
  1061  	return nil
  1062  }
  1063  
  1064  // closes the client connection immediately. In-flight requests are interrupted.
  1065  // err is sent to streams.
  1066  func (cc *ClientConn) closeForError(err error) {
  1067  	cc.mu.Lock()
  1068  	cc.closed = true
  1069  	for _, cs := range cc.streams {
  1070  		cs.abortStreamLocked(err)
  1071  	}
  1072  	cc.cond.Broadcast()
  1073  	cc.mu.Unlock()
  1074  	cc.closeConn()
  1075  }
  1076  
  1077  // Close closes the client connection immediately.
  1078  //
  1079  // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1080  func (cc *ClientConn) Close() error {
  1081  	cc.closeForError(errClientConnForceClosed)
  1082  	return nil
  1083  }
  1084  
  1085  // closes the client connection immediately. In-flight requests are interrupted.
  1086  func (cc *ClientConn) closeForLostPing() {
  1087  	err := errors.New("http2: client connection lost")
  1088  	if f := cc.fr.countError; f != nil {
  1089  		f("conn_close_lost_ping")
  1090  	}
  1091  	cc.closeForError(err)
  1092  }
  1093  
  1094  // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1095  // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1096  var errRequestCanceled = internal.ErrRequestCanceled
  1097  
  1098  func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1099  	if cc.t.t1 != nil {
  1100  		return cc.t.t1.ResponseHeaderTimeout()
  1101  	}
  1102  	// No way to do this (yet?) with just an http2.Transport. Probably
  1103  	// no need. Request.Cancel this is the new way. We only need to support
  1104  	// this for compatibility with the old http.Transport fields when
  1105  	// we're doing transparent http2.
  1106  	return 0
  1107  }
  1108  
  1109  // actualContentLength returns a sanitized version of
  1110  // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1111  // means unknown.
  1112  func actualContentLength(req *ClientRequest) int64 {
  1113  	if req.Body == nil || req.Body == NoBody {
  1114  		return 0
  1115  	}
  1116  	if req.ContentLength != 0 {
  1117  		return req.ContentLength
  1118  	}
  1119  	return -1
  1120  }
  1121  
  1122  func (cc *ClientConn) decrStreamReservations() {
  1123  	cc.mu.Lock()
  1124  	defer cc.mu.Unlock()
  1125  	cc.decrStreamReservationsLocked()
  1126  }
  1127  
  1128  func (cc *ClientConn) decrStreamReservationsLocked() {
  1129  	if cc.streamsReserved > 0 {
  1130  		cc.streamsReserved--
  1131  	}
  1132  }
  1133  
  1134  func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  1135  	return cc.roundTrip(req, nil)
  1136  }
  1137  
  1138  func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
  1139  	ctx := req.Context
  1140  	req.stream = clientStream{
  1141  		cc:                   cc,
  1142  		ctx:                  ctx,
  1143  		reqCancel:            req.Cancel,
  1144  		isHead:               req.Method == "HEAD",
  1145  		reqBody:              req.Body,
  1146  		reqBodyContentLength: actualContentLength(req),
  1147  		trace:                httptrace.ContextClientTrace(ctx),
  1148  		peerClosed:           make(chan struct{}),
  1149  		abort:                make(chan struct{}),
  1150  		respHeaderRecv:       make(chan struct{}),
  1151  		donec:                make(chan struct{}),
  1152  		resTrailer:           req.ResTrailer,
  1153  	}
  1154  	cs := &req.stream
  1155  
  1156  	cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
  1157  
  1158  	go cs.doRequest(req, streamf)
  1159  
  1160  	waitDone := func() error {
  1161  		select {
  1162  		case <-cs.donec:
  1163  			return nil
  1164  		case <-ctx.Done():
  1165  			return ctx.Err()
  1166  		case <-cs.reqCancel:
  1167  			return errRequestCanceled
  1168  		}
  1169  	}
  1170  
  1171  	handleResponseHeaders := func() (*ClientResponse, error) {
  1172  		res := cs.res
  1173  		if res.StatusCode > 299 {
  1174  			// On error or status code 3xx, 4xx, 5xx, etc abort any
  1175  			// ongoing write, assuming that the server doesn't care
  1176  			// about our request body. If the server replied with 1xx or
  1177  			// 2xx, however, then assume the server DOES potentially
  1178  			// want our body (e.g. full-duplex streaming:
  1179  			// golang.org/issue/13444). If it turns out the server
  1180  			// doesn't, they'll RST_STREAM us soon enough. This is a
  1181  			// heuristic to avoid adding knobs to Transport. Hopefully
  1182  			// we can keep it.
  1183  			cs.abortRequestBodyWrite()
  1184  		}
  1185  		res.TLS = cc.tlsState
  1186  		if res.Body == NoBody && actualContentLength(req) == 0 {
  1187  			// If there isn't a request or response body still being
  1188  			// written, then wait for the stream to be closed before
  1189  			// RoundTrip returns.
  1190  			if err := waitDone(); err != nil {
  1191  				return nil, err
  1192  			}
  1193  		}
  1194  		return res, nil
  1195  	}
  1196  
  1197  	cancelRequest := func(cs *clientStream, err error) error {
  1198  		cs.cc.mu.Lock()
  1199  		bodyClosed := cs.reqBodyClosed
  1200  		cs.cc.mu.Unlock()
  1201  		// Wait for the request body to be closed.
  1202  		//
  1203  		// If nothing closed the body before now, abortStreamLocked
  1204  		// will have started a goroutine to close it.
  1205  		//
  1206  		// Closing the body before returning avoids a race condition
  1207  		// with net/http checking its readTrackingBody to see if the
  1208  		// body was read from or closed. See golang/go#60041.
  1209  		//
  1210  		// The body is closed in a separate goroutine without the
  1211  		// connection mutex held, but dropping the mutex before waiting
  1212  		// will keep us from holding it indefinitely if the body
  1213  		// close is slow for some reason.
  1214  		if bodyClosed != nil {
  1215  			<-bodyClosed
  1216  		}
  1217  		return err
  1218  	}
  1219  
  1220  	for {
  1221  		select {
  1222  		case <-cs.respHeaderRecv:
  1223  			return handleResponseHeaders()
  1224  		case <-cs.abort:
  1225  			select {
  1226  			case <-cs.respHeaderRecv:
  1227  				// If both cs.respHeaderRecv and cs.abort are signaling,
  1228  				// pick respHeaderRecv. The server probably wrote the
  1229  				// response and immediately reset the stream.
  1230  				// golang.org/issue/49645
  1231  				return handleResponseHeaders()
  1232  			default:
  1233  				waitDone()
  1234  				return nil, cs.abortErr
  1235  			}
  1236  		case <-ctx.Done():
  1237  			err := ctx.Err()
  1238  			cs.abortStream(err)
  1239  			return nil, cancelRequest(cs, err)
  1240  		case <-cs.reqCancel:
  1241  			cs.abortStream(errRequestCanceled)
  1242  			return nil, cancelRequest(cs, errRequestCanceled)
  1243  		}
  1244  	}
  1245  }
  1246  
  1247  // doRequest runs for the duration of the request lifetime.
  1248  //
  1249  // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1250  func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
  1251  	err := cs.writeRequest(req, streamf)
  1252  	cs.cleanupWriteRequest(err)
  1253  }
  1254  
  1255  var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1256  
  1257  // writeRequest sends a request.
  1258  //
  1259  // It returns nil after the request is written, the response read,
  1260  // and the request stream is half-closed by the peer.
  1261  //
  1262  // It returns non-nil if the request ends otherwise.
  1263  // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1264  func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
  1265  	cc := cs.cc
  1266  	ctx := cs.ctx
  1267  
  1268  	// wait for setting frames to be received, a server can change this value later,
  1269  	// but we just wait for the first settings frame
  1270  	var isExtendedConnect bool
  1271  	if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1272  		isExtendedConnect = true
  1273  	}
  1274  
  1275  	// Acquire the new-request lock by writing to reqHeaderMu.
  1276  	// This lock guards the critical section covering allocating a new stream ID
  1277  	// (requires mu) and creating the stream (requires wmu).
  1278  	if cc.reqHeaderMu == nil {
  1279  		panic("RoundTrip on uninitialized ClientConn") // for tests
  1280  	}
  1281  	if isExtendedConnect {
  1282  		select {
  1283  		case <-cs.reqCancel:
  1284  			return errRequestCanceled
  1285  		case <-ctx.Done():
  1286  			return ctx.Err()
  1287  		case <-cc.seenSettingsChan:
  1288  			if !cc.extendedConnectAllowed {
  1289  				return errExtendedConnectNotSupported
  1290  			}
  1291  		}
  1292  	}
  1293  	select {
  1294  	case cc.reqHeaderMu <- struct{}{}:
  1295  	case <-cs.reqCancel:
  1296  		return errRequestCanceled
  1297  	case <-ctx.Done():
  1298  		return ctx.Err()
  1299  	}
  1300  
  1301  	cc.mu.Lock()
  1302  	if cc.idleTimer != nil {
  1303  		cc.idleTimer.Stop()
  1304  	}
  1305  	cc.decrStreamReservationsLocked()
  1306  	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1307  		cc.mu.Unlock()
  1308  		<-cc.reqHeaderMu
  1309  		return err
  1310  	}
  1311  	cc.addStreamLocked(cs) // assigns stream ID
  1312  	if isConnectionCloseRequest(req) {
  1313  		cc.doNotReuse = true
  1314  	}
  1315  	cc.mu.Unlock()
  1316  
  1317  	if streamf != nil {
  1318  		streamf(cs)
  1319  	}
  1320  
  1321  	continueTimeout := cc.t.expectContinueTimeout()
  1322  	if continueTimeout != 0 {
  1323  		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1324  			continueTimeout = 0
  1325  		} else {
  1326  			cs.on100 = make(chan struct{}, 1)
  1327  		}
  1328  	}
  1329  
  1330  	// Past this point (where we send request headers), it is possible for
  1331  	// RoundTrip to return successfully. Since the RoundTrip contract permits
  1332  	// the caller to "mutate or reuse" the Request after closing the Response's Body,
  1333  	// we must take care when referencing the Request from here on.
  1334  	err = cs.encodeAndWriteHeaders(req)
  1335  	<-cc.reqHeaderMu
  1336  	if err != nil {
  1337  		return err
  1338  	}
  1339  
  1340  	hasBody := cs.reqBodyContentLength != 0
  1341  	if !hasBody {
  1342  		cs.sentEndStream = true
  1343  	} else {
  1344  		if continueTimeout != 0 {
  1345  			traceWait100Continue(cs.trace)
  1346  			timer := time.NewTimer(continueTimeout)
  1347  			select {
  1348  			case <-timer.C:
  1349  				err = nil
  1350  			case <-cs.on100:
  1351  				err = nil
  1352  			case <-cs.abort:
  1353  				err = cs.abortErr
  1354  			case <-ctx.Done():
  1355  				err = ctx.Err()
  1356  			case <-cs.reqCancel:
  1357  				err = errRequestCanceled
  1358  			}
  1359  			timer.Stop()
  1360  			if err != nil {
  1361  				traceWroteRequest(cs.trace, err)
  1362  				return err
  1363  			}
  1364  		}
  1365  
  1366  		if err = cs.writeRequestBody(req); err != nil {
  1367  			if err != errStopReqBodyWrite {
  1368  				traceWroteRequest(cs.trace, err)
  1369  				return err
  1370  			}
  1371  		} else {
  1372  			cs.sentEndStream = true
  1373  		}
  1374  	}
  1375  
  1376  	traceWroteRequest(cs.trace, err)
  1377  
  1378  	var respHeaderTimer <-chan time.Time
  1379  	var respHeaderRecv chan struct{}
  1380  	if d := cc.responseHeaderTimeout(); d != 0 {
  1381  		timer := time.NewTimer(d)
  1382  		defer timer.Stop()
  1383  		respHeaderTimer = timer.C
  1384  		respHeaderRecv = cs.respHeaderRecv
  1385  	}
  1386  	// Wait until the peer half-closes its end of the stream,
  1387  	// or until the request is aborted (via context, error, or otherwise),
  1388  	// whichever comes first.
  1389  	for {
  1390  		select {
  1391  		case <-cs.peerClosed:
  1392  			return nil
  1393  		case <-respHeaderTimer:
  1394  			return errTimeout
  1395  		case <-respHeaderRecv:
  1396  			respHeaderRecv = nil
  1397  			respHeaderTimer = nil // keep waiting for END_STREAM
  1398  		case <-cs.abort:
  1399  			return cs.abortErr
  1400  		case <-ctx.Done():
  1401  			return ctx.Err()
  1402  		case <-cs.reqCancel:
  1403  			return errRequestCanceled
  1404  		}
  1405  	}
  1406  }
  1407  
  1408  func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
  1409  	cc := cs.cc
  1410  	ctx := cs.ctx
  1411  
  1412  	cc.wmu.Lock()
  1413  	defer cc.wmu.Unlock()
  1414  
  1415  	// If the request was canceled while waiting for cc.mu, just quit.
  1416  	select {
  1417  	case <-cs.abort:
  1418  		return cs.abortErr
  1419  	case <-ctx.Done():
  1420  		return ctx.Err()
  1421  	case <-cs.reqCancel:
  1422  		return errRequestCanceled
  1423  	default:
  1424  	}
  1425  
  1426  	// Encode headers.
  1427  	//
  1428  	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1429  	// sent by writeRequestBody below, along with any Trailers,
  1430  	// again in form HEADERS{1}, CONTINUATION{0,})
  1431  	cc.hbuf.Reset()
  1432  	res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
  1433  		cc.writeHeader(name, value)
  1434  	})
  1435  	if err != nil {
  1436  		return fmt.Errorf("http2: %w", err)
  1437  	}
  1438  	hdrs := cc.hbuf.Bytes()
  1439  
  1440  	// Write the request.
  1441  	endStream := !res.HasBody && !res.HasTrailers
  1442  	cs.sentHeaders = true
  1443  	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1444  	traceWroteHeaders(cs.trace)
  1445  	return err
  1446  }
  1447  
  1448  func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
  1449  	return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
  1450  		Request: httpcommon.Request{
  1451  			Header:              req.Header,
  1452  			Trailer:             req.Trailer,
  1453  			URL:                 req.URL,
  1454  			Host:                req.Host,
  1455  			Method:              req.Method,
  1456  			ActualContentLength: actualContentLength(req),
  1457  		},
  1458  		AddGzipHeader:         addGzipHeader,
  1459  		PeerMaxHeaderListSize: peerMaxHeaderListSize,
  1460  		DefaultUserAgent:      defaultUserAgent,
  1461  	}, headerf)
  1462  }
  1463  
  1464  // cleanupWriteRequest performs post-request tasks.
  1465  //
  1466  // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1467  // cleanupWriteRequest will send a reset to the peer.
  1468  func (cs *clientStream) cleanupWriteRequest(err error) {
  1469  	cc := cs.cc
  1470  
  1471  	if cs.ID == 0 {
  1472  		// We were canceled before creating the stream, so return our reservation.
  1473  		cc.decrStreamReservations()
  1474  	}
  1475  
  1476  	// TODO: write h12Compare test showing whether
  1477  	// Request.Body is closed by the Transport,
  1478  	// and in multiple cases: server replies <=299 and >299
  1479  	// while still writing request body
  1480  	cc.mu.Lock()
  1481  	mustCloseBody := false
  1482  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1483  		mustCloseBody = true
  1484  		cs.reqBodyClosed = make(chan struct{})
  1485  	}
  1486  	bodyClosed := cs.reqBodyClosed
  1487  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1488  	// Have we read any frames from the connection since sending this request?
  1489  	readSinceStream := cc.readBeforeStreamID > cs.ID
  1490  	cc.mu.Unlock()
  1491  	if mustCloseBody {
  1492  		cs.reqBody.Close()
  1493  		close(bodyClosed)
  1494  	}
  1495  	if bodyClosed != nil {
  1496  		<-bodyClosed
  1497  	}
  1498  
  1499  	if err != nil && cs.sentEndStream {
  1500  		// If the connection is closed immediately after the response is read,
  1501  		// we may be aborted before finishing up here. If the stream was closed
  1502  		// cleanly on both sides, there is no error.
  1503  		select {
  1504  		case <-cs.peerClosed:
  1505  			err = nil
  1506  		default:
  1507  		}
  1508  	}
  1509  	if err != nil {
  1510  		cs.abortStream(err) // possibly redundant, but harmless
  1511  		if cs.sentHeaders {
  1512  			if se, ok := err.(StreamError); ok {
  1513  				if se.Cause != errFromPeer {
  1514  					cc.writeStreamReset(cs.ID, se.Code, false, err)
  1515  				}
  1516  			} else {
  1517  				// We're cancelling an in-flight request.
  1518  				//
  1519  				// This could be due to the server becoming unresponsive.
  1520  				// To avoid sending too many requests on a dead connection,
  1521  				// if we haven't read any frames from the connection since
  1522  				// sending this request, we let it continue to consume
  1523  				// a concurrency slot until we can confirm the server is
  1524  				// still responding.
  1525  				// We do this by sending a PING frame along with the RST_STREAM
  1526  				// (unless a ping is already in flight).
  1527  				//
  1528  				// For simplicity, we don't bother tracking the PING payload:
  1529  				// We reset cc.pendingResets any time we receive a PING ACK.
  1530  				//
  1531  				// We skip this if the conn is going to be closed on idle,
  1532  				// because it's short lived and will probably be closed before
  1533  				// we get the ping response.
  1534  				ping := false
  1535  				if !closeOnIdle && !readSinceStream {
  1536  					cc.mu.Lock()
  1537  					// rstStreamPingsBlocked works around a gRPC behavior:
  1538  					// see comment on the field for details.
  1539  					if !cc.rstStreamPingsBlocked {
  1540  						if cc.pendingResets == 0 {
  1541  							ping = true
  1542  						}
  1543  						cc.pendingResets++
  1544  					}
  1545  					cc.mu.Unlock()
  1546  				}
  1547  				cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1548  			}
  1549  		}
  1550  		cs.bufPipe.CloseWithError(err) // no-op if already closed
  1551  	} else {
  1552  		if cs.sentHeaders && !cs.sentEndStream {
  1553  			cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1554  		}
  1555  		cs.bufPipe.CloseWithError(errRequestCanceled)
  1556  	}
  1557  	if cs.ID != 0 {
  1558  		cc.forgetStreamID(cs.ID)
  1559  	}
  1560  
  1561  	cc.wmu.Lock()
  1562  	werr := cc.werr
  1563  	cc.wmu.Unlock()
  1564  	if werr != nil {
  1565  		cc.Close()
  1566  	}
  1567  
  1568  	close(cs.donec)
  1569  	cc.maybeCallStateHook()
  1570  }
  1571  
  1572  // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1573  // Must hold cc.mu.
  1574  func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1575  	for {
  1576  		if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1577  			// This is the very first request sent to this connection.
  1578  			// Return a fatal error which aborts the retry loop.
  1579  			return errClientConnNotEstablished
  1580  		}
  1581  		cc.lastActive = time.Now()
  1582  		if cc.closed || !cc.canTakeNewRequestLocked() {
  1583  			return errClientConnUnusable
  1584  		}
  1585  		cc.lastIdle = time.Time{}
  1586  		if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1587  			return nil
  1588  		}
  1589  		cc.pendingRequests++
  1590  		cc.cond.Wait()
  1591  		cc.pendingRequests--
  1592  		select {
  1593  		case <-cs.abort:
  1594  			return cs.abortErr
  1595  		default:
  1596  		}
  1597  	}
  1598  }
  1599  
  1600  // requires cc.wmu be held
  1601  func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1602  	first := true // first frame written (HEADERS is first, then CONTINUATION)
  1603  	for len(hdrs) > 0 && cc.werr == nil {
  1604  		chunk := hdrs
  1605  		if len(chunk) > maxFrameSize {
  1606  			chunk = chunk[:maxFrameSize]
  1607  		}
  1608  		hdrs = hdrs[len(chunk):]
  1609  		endHeaders := len(hdrs) == 0
  1610  		if first {
  1611  			cc.fr.WriteHeaders(HeadersFrameParam{
  1612  				StreamID:      streamID,
  1613  				BlockFragment: chunk,
  1614  				EndStream:     endStream,
  1615  				EndHeaders:    endHeaders,
  1616  			})
  1617  			first = false
  1618  		} else {
  1619  			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1620  		}
  1621  	}
  1622  	cc.bw.Flush()
  1623  	return cc.werr
  1624  }
  1625  
  1626  // internal error values; they don't escape to callers
  1627  var (
  1628  	// abort request body write; don't send cancel
  1629  	errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1630  
  1631  	// abort request body write, but send stream reset of cancel.
  1632  	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1633  
  1634  	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1635  )
  1636  
  1637  // frameScratchBufferLen returns the length of a buffer to use for
  1638  // outgoing request bodies to read/write to/from.
  1639  //
  1640  // It returns max(1, min(peer's advertised max frame size,
  1641  // Request.ContentLength+1, 512KB)).
  1642  func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1643  	const max = 512 << 10
  1644  	n := min(int64(maxFrameSize), max)
  1645  	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1646  		// Add an extra byte past the declared content-length to
  1647  		// give the caller's Request.Body io.Reader a chance to
  1648  		// give us more bytes than they declared, so we can catch it
  1649  		// early.
  1650  		n = cl + 1
  1651  	}
  1652  	if n < 1 {
  1653  		return 1
  1654  	}
  1655  	return int(n) // doesn't truncate; max is 512K
  1656  }
  1657  
  1658  // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1659  // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1660  // requests needing big buffers. The size ranges are as follows:
  1661  // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1662  // {256 KB, 512 KB], {512 KB, infinity}
  1663  // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1664  // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1665  // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1666  var bufPools [7]sync.Pool // of *[]byte
  1667  func bufPoolIndex(size int) int {
  1668  	if size <= 16384 {
  1669  		return 0
  1670  	}
  1671  	size -= 1
  1672  	bits := bits.Len(uint(size))
  1673  	index := bits - 14
  1674  	if index >= len(bufPools) {
  1675  		return len(bufPools) - 1
  1676  	}
  1677  	return index
  1678  }
  1679  
  1680  func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
  1681  	cc := cs.cc
  1682  	body := cs.reqBody
  1683  	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1684  
  1685  	hasTrailers := req.Trailer != nil
  1686  	remainLen := cs.reqBodyContentLength
  1687  	hasContentLen := remainLen != -1
  1688  
  1689  	cc.mu.Lock()
  1690  	maxFrameSize := int(cc.maxFrameSize)
  1691  	cc.mu.Unlock()
  1692  
  1693  	// Scratch buffer for reading into & writing from.
  1694  	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1695  	var buf []byte
  1696  	index := bufPoolIndex(scratchLen)
  1697  	if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1698  		defer bufPools[index].Put(bp)
  1699  		buf = *bp
  1700  	} else {
  1701  		buf = make([]byte, scratchLen)
  1702  		defer bufPools[index].Put(&buf)
  1703  	}
  1704  
  1705  	var sawEOF bool
  1706  	for !sawEOF {
  1707  		n, err := body.Read(buf)
  1708  		if hasContentLen {
  1709  			remainLen -= int64(n)
  1710  			if remainLen == 0 && err == nil {
  1711  				// The request body's Content-Length was predeclared and
  1712  				// we just finished reading it all, but the underlying io.Reader
  1713  				// returned the final chunk with a nil error (which is one of
  1714  				// the two valid things a Reader can do at EOF). Because we'd prefer
  1715  				// to send the END_STREAM bit early, double-check that we're actually
  1716  				// at EOF. Subsequent reads should return (0, EOF) at this point.
  1717  				// If either value is different, we return an error in one of two ways below.
  1718  				var scratch [1]byte
  1719  				var n1 int
  1720  				n1, err = body.Read(scratch[:])
  1721  				remainLen -= int64(n1)
  1722  			}
  1723  			if remainLen < 0 {
  1724  				err = errReqBodyTooLong
  1725  				return err
  1726  			}
  1727  		}
  1728  		if err != nil {
  1729  			cc.mu.Lock()
  1730  			bodyClosed := cs.reqBodyClosed != nil
  1731  			cc.mu.Unlock()
  1732  			switch {
  1733  			case bodyClosed:
  1734  				return errStopReqBodyWrite
  1735  			case err == io.EOF:
  1736  				sawEOF = true
  1737  				err = nil
  1738  			default:
  1739  				return err
  1740  			}
  1741  		}
  1742  
  1743  		remain := buf[:n]
  1744  		for len(remain) > 0 && err == nil {
  1745  			var allowed int32
  1746  			allowed, err = cs.awaitFlowControl(len(remain))
  1747  			if err != nil {
  1748  				return err
  1749  			}
  1750  			cc.wmu.Lock()
  1751  			data := remain[:allowed]
  1752  			remain = remain[allowed:]
  1753  			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1754  			err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1755  			if err == nil {
  1756  				// TODO(bradfitz): this flush is for latency, not bandwidth.
  1757  				// Most requests won't need this. Make this opt-in or
  1758  				// opt-out?  Use some heuristic on the body type? Nagel-like
  1759  				// timers?  Based on 'n'? Only last chunk of this for loop,
  1760  				// unless flow control tokens are low? For now, always.
  1761  				// If we change this, see comment below.
  1762  				err = cc.bw.Flush()
  1763  			}
  1764  			cc.wmu.Unlock()
  1765  		}
  1766  		if err != nil {
  1767  			return err
  1768  		}
  1769  	}
  1770  
  1771  	if sentEnd {
  1772  		// Already sent END_STREAM (which implies we have no
  1773  		// trailers) and flushed, because currently all
  1774  		// WriteData frames above get a flush. So we're done.
  1775  		return nil
  1776  	}
  1777  
  1778  	// Since the RoundTrip contract permits the caller to "mutate or reuse"
  1779  	// a request after the Response's Body is closed, verify that this hasn't
  1780  	// happened before accessing the trailers.
  1781  	cc.mu.Lock()
  1782  	trailer := req.Trailer
  1783  	err = cs.abortErr
  1784  	cc.mu.Unlock()
  1785  	if err != nil {
  1786  		return err
  1787  	}
  1788  
  1789  	cc.wmu.Lock()
  1790  	defer cc.wmu.Unlock()
  1791  	var trls []byte
  1792  	if len(trailer) > 0 {
  1793  		trls, err = cc.encodeTrailers(trailer)
  1794  		if err != nil {
  1795  			return err
  1796  		}
  1797  	}
  1798  
  1799  	// Two ways to send END_STREAM: either with trailers, or
  1800  	// with an empty DATA frame.
  1801  	if len(trls) > 0 {
  1802  		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  1803  	} else {
  1804  		err = cc.fr.WriteData(cs.ID, true, nil)
  1805  	}
  1806  	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  1807  		err = ferr
  1808  	}
  1809  	return err
  1810  }
  1811  
  1812  // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  1813  // control tokens from the server.
  1814  // It returns either the non-zero number of tokens taken or an error
  1815  // if the stream is dead.
  1816  func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  1817  	cc := cs.cc
  1818  	ctx := cs.ctx
  1819  	cc.mu.Lock()
  1820  	defer cc.mu.Unlock()
  1821  	for {
  1822  		if cc.closed {
  1823  			return 0, errClientConnClosed
  1824  		}
  1825  		if cs.reqBodyClosed != nil {
  1826  			return 0, errStopReqBodyWrite
  1827  		}
  1828  		select {
  1829  		case <-cs.abort:
  1830  			return 0, cs.abortErr
  1831  		case <-ctx.Done():
  1832  			return 0, ctx.Err()
  1833  		case <-cs.reqCancel:
  1834  			return 0, errRequestCanceled
  1835  		default:
  1836  		}
  1837  		if a := cs.flow.available(); a > 0 {
  1838  			take := a
  1839  			if int(take) > maxBytes {
  1840  
  1841  				take = int32(maxBytes) // can't truncate int; take is int32
  1842  			}
  1843  			if take > int32(cc.maxFrameSize) {
  1844  				take = int32(cc.maxFrameSize)
  1845  			}
  1846  			cs.flow.take(take)
  1847  			return take, nil
  1848  		}
  1849  		cc.cond.Wait()
  1850  	}
  1851  }
  1852  
  1853  // requires cc.wmu be held.
  1854  func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
  1855  	cc.hbuf.Reset()
  1856  
  1857  	hlSize := uint64(0)
  1858  	for k, vv := range trailer {
  1859  		for _, v := range vv {
  1860  			hf := hpack.HeaderField{Name: k, Value: v}
  1861  			hlSize += uint64(hf.Size())
  1862  		}
  1863  	}
  1864  	if hlSize > cc.peerMaxHeaderListSize {
  1865  		return nil, errRequestHeaderListSize
  1866  	}
  1867  
  1868  	for k, vv := range trailer {
  1869  		lowKey, ascii := httpcommon.LowerHeader(k)
  1870  		if !ascii {
  1871  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  1872  			// field names have to be ASCII characters (just as in HTTP/1.x).
  1873  			continue
  1874  		}
  1875  		// Transfer-Encoding, etc.. have already been filtered at the
  1876  		// start of RoundTrip
  1877  		for _, v := range vv {
  1878  			cc.writeHeader(lowKey, v)
  1879  		}
  1880  	}
  1881  	return cc.hbuf.Bytes(), nil
  1882  }
  1883  
  1884  func (cc *ClientConn) writeHeader(name, value string) {
  1885  	if VerboseLogs {
  1886  		log.Printf("http2: Transport encoding header %q = %q", name, value)
  1887  	}
  1888  	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  1889  }
  1890  
  1891  type resAndError struct {
  1892  	_   incomparable
  1893  	res *ClientResponse
  1894  	err error
  1895  }
  1896  
  1897  // requires cc.mu be held.
  1898  func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  1899  	cs.flow.add(int32(cc.initialWindowSize))
  1900  	cs.flow.setConnFlow(&cc.flow)
  1901  	cs.inflow.init(cc.initialStreamRecvWindowSize)
  1902  	cs.ID = cc.nextStreamID
  1903  	cc.nextStreamID += 2
  1904  	cc.streams[cs.ID] = cs
  1905  	if cs.ID == 0 {
  1906  		panic("assigned stream ID 0")
  1907  	}
  1908  }
  1909  
  1910  func (cc *ClientConn) forgetStreamID(id uint32) {
  1911  	cc.mu.Lock()
  1912  	slen := len(cc.streams)
  1913  	delete(cc.streams, id)
  1914  	if len(cc.streams) != slen-1 {
  1915  		panic("forgetting unknown stream id")
  1916  	}
  1917  	cc.lastActive = time.Now()
  1918  	if len(cc.streams) == 0 && cc.idleTimer != nil {
  1919  		cc.idleTimer.Reset(cc.idleTimeout)
  1920  		cc.lastIdle = time.Now()
  1921  	}
  1922  	// Wake up writeRequestBody via clientStream.awaitFlowControl and
  1923  	// wake up RoundTrip if there is a pending request.
  1924  	cc.cond.Broadcast()
  1925  
  1926  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1927  	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  1928  		if VerboseLogs {
  1929  			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  1930  		}
  1931  		cc.closed = true
  1932  		defer cc.closeConn()
  1933  	}
  1934  
  1935  	cc.mu.Unlock()
  1936  }
  1937  
  1938  // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  1939  type clientConnReadLoop struct {
  1940  	_  incomparable
  1941  	cc *ClientConn
  1942  }
  1943  
  1944  // readLoop runs in its own goroutine and reads and dispatches frames.
  1945  func (cc *ClientConn) readLoop() {
  1946  	rl := &clientConnReadLoop{cc: cc}
  1947  	defer rl.cleanup()
  1948  	cc.readerErr = rl.run()
  1949  	if ce, ok := cc.readerErr.(ConnectionError); ok {
  1950  		cc.wmu.Lock()
  1951  		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  1952  		cc.wmu.Unlock()
  1953  	}
  1954  }
  1955  
  1956  // GoAwayError is returned by the Transport when the server closes the
  1957  // TCP connection after sending a GOAWAY frame.
  1958  type GoAwayError struct {
  1959  	LastStreamID uint32
  1960  	ErrCode      ErrCode
  1961  	DebugData    string
  1962  }
  1963  
  1964  func (e GoAwayError) Error() string {
  1965  	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  1966  		e.LastStreamID, e.ErrCode, e.DebugData)
  1967  }
  1968  
  1969  func isEOFOrNetReadError(err error) bool {
  1970  	if err == io.EOF {
  1971  		return true
  1972  	}
  1973  	ne, ok := err.(*net.OpError)
  1974  	return ok && ne.Op == "read"
  1975  }
  1976  
  1977  func (rl *clientConnReadLoop) cleanup() {
  1978  	cc := rl.cc
  1979  	defer cc.closeConn()
  1980  	defer close(cc.readerDone)
  1981  
  1982  	if cc.idleTimer != nil {
  1983  		cc.idleTimer.Stop()
  1984  	}
  1985  
  1986  	// Close any response bodies if the server closes prematurely.
  1987  	// TODO: also do this if we've written the headers but not
  1988  	// gotten a response yet.
  1989  	err := cc.readerErr
  1990  	cc.mu.Lock()
  1991  	if cc.goAway != nil && isEOFOrNetReadError(err) {
  1992  		err = GoAwayError{
  1993  			LastStreamID: cc.goAway.LastStreamID,
  1994  			ErrCode:      cc.goAway.ErrCode,
  1995  			DebugData:    cc.goAwayDebug,
  1996  		}
  1997  	} else if err == io.EOF {
  1998  		err = io.ErrUnexpectedEOF
  1999  	}
  2000  	cc.closed = true
  2001  
  2002  	// If the connection has never been used, and has been open for only a short time,
  2003  	// leave it in the connection pool for a little while.
  2004  	//
  2005  	// This avoids a situation where new connections are constantly created,
  2006  	// added to the pool, fail, and are removed from the pool, without any error
  2007  	// being surfaced to the user.
  2008  	unusedWaitTime := 5 * time.Second
  2009  	if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
  2010  		unusedWaitTime = cc.idleTimeout
  2011  	}
  2012  	idleTime := time.Now().Sub(cc.lastActive)
  2013  	if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
  2014  		cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
  2015  			cc.t.connPool.MarkDead(cc)
  2016  		})
  2017  	} else {
  2018  		cc.mu.Unlock() // avoid any deadlocks in MarkDead
  2019  		cc.t.connPool.MarkDead(cc)
  2020  		cc.mu.Lock()
  2021  	}
  2022  
  2023  	for _, cs := range cc.streams {
  2024  		select {
  2025  		case <-cs.peerClosed:
  2026  			// The server closed the stream before closing the conn,
  2027  			// so no need to interrupt it.
  2028  		default:
  2029  			cs.abortStreamLocked(err)
  2030  		}
  2031  	}
  2032  	cc.cond.Broadcast()
  2033  	cc.mu.Unlock()
  2034  
  2035  	if !cc.seenSettings {
  2036  		// If we have a pending request that wants extended CONNECT,
  2037  		// let it continue and fail with the connection error.
  2038  		cc.extendedConnectAllowed = true
  2039  		close(cc.seenSettingsChan)
  2040  	}
  2041  }
  2042  
  2043  // countReadFrameError calls ClientConn.fr.countError with a string
  2044  // representing err.
  2045  func (cc *ClientConn) countReadFrameError(err error) {
  2046  	f := cc.fr.countError
  2047  	if f == nil || err == nil {
  2048  		return
  2049  	}
  2050  	if ce, ok := err.(ConnectionError); ok {
  2051  		errCode := ErrCode(ce)
  2052  		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2053  		return
  2054  	}
  2055  	if errors.Is(err, io.EOF) {
  2056  		f("read_frame_eof")
  2057  		return
  2058  	}
  2059  	if errors.Is(err, io.ErrUnexpectedEOF) {
  2060  		f("read_frame_unexpected_eof")
  2061  		return
  2062  	}
  2063  	if errors.Is(err, ErrFrameTooLarge) {
  2064  		f("read_frame_too_large")
  2065  		return
  2066  	}
  2067  	f("read_frame_other")
  2068  }
  2069  
  2070  func (rl *clientConnReadLoop) run() error {
  2071  	cc := rl.cc
  2072  	gotSettings := false
  2073  	readIdleTimeout := cc.readIdleTimeout
  2074  	var t *time.Timer
  2075  	if readIdleTimeout != 0 {
  2076  		t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
  2077  	}
  2078  	for {
  2079  		f, err := cc.fr.ReadFrame()
  2080  		if t != nil {
  2081  			t.Reset(readIdleTimeout)
  2082  		}
  2083  		if err != nil {
  2084  			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2085  		}
  2086  		if se, ok := err.(StreamError); ok {
  2087  			if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2088  				if se.Cause == nil {
  2089  					se.Cause = cc.fr.errDetail
  2090  				}
  2091  				rl.endStreamError(cs, se)
  2092  			}
  2093  			continue
  2094  		} else if err != nil {
  2095  			cc.countReadFrameError(err)
  2096  			return err
  2097  		}
  2098  		if VerboseLogs {
  2099  			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2100  		}
  2101  		if !gotSettings {
  2102  			if _, ok := f.(*SettingsFrame); !ok {
  2103  				cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2104  				return ConnectionError(ErrCodeProtocol)
  2105  			}
  2106  			gotSettings = true
  2107  		}
  2108  
  2109  		switch f := f.(type) {
  2110  		case *MetaHeadersFrame:
  2111  			err = rl.processHeaders(f)
  2112  		case *DataFrame:
  2113  			err = rl.processData(f)
  2114  		case *GoAwayFrame:
  2115  			err = rl.processGoAway(f)
  2116  		case *RSTStreamFrame:
  2117  			err = rl.processResetStream(f)
  2118  		case *SettingsFrame:
  2119  			err = rl.processSettings(f)
  2120  		case *PushPromiseFrame:
  2121  			err = rl.processPushPromise(f)
  2122  		case *WindowUpdateFrame:
  2123  			err = rl.processWindowUpdate(f)
  2124  		case *PingFrame:
  2125  			err = rl.processPing(f)
  2126  		default:
  2127  			cc.logf("Transport: unhandled response frame type %T", f)
  2128  		}
  2129  		if err != nil {
  2130  			if VerboseLogs {
  2131  				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2132  			}
  2133  			return err
  2134  		}
  2135  	}
  2136  }
  2137  
  2138  func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2139  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2140  	if cs == nil {
  2141  		// We'd get here if we canceled a request while the
  2142  		// server had its response still in flight. So if this
  2143  		// was just something we canceled, ignore it.
  2144  		return nil
  2145  	}
  2146  	if cs.readClosed {
  2147  		rl.endStreamError(cs, StreamError{
  2148  			StreamID: f.StreamID,
  2149  			Code:     ErrCodeProtocol,
  2150  			Cause:    errors.New("protocol error: headers after END_STREAM"),
  2151  		})
  2152  		return nil
  2153  	}
  2154  	if !cs.firstByte {
  2155  		if cs.trace != nil {
  2156  			// TODO(bradfitz): move first response byte earlier,
  2157  			// when we first read the 9 byte header, not waiting
  2158  			// until all the HEADERS+CONTINUATION frames have been
  2159  			// merged. This works for now.
  2160  			traceFirstResponseByte(cs.trace)
  2161  		}
  2162  		cs.firstByte = true
  2163  	}
  2164  	if !cs.pastHeaders {
  2165  		cs.pastHeaders = true
  2166  	} else {
  2167  		return rl.processTrailers(cs, f)
  2168  	}
  2169  
  2170  	res, err := rl.handleResponse(cs, f)
  2171  	if err != nil {
  2172  		if _, ok := err.(ConnectionError); ok {
  2173  			return err
  2174  		}
  2175  		// Any other error type is a stream error.
  2176  		rl.endStreamError(cs, StreamError{
  2177  			StreamID: f.StreamID,
  2178  			Code:     ErrCodeProtocol,
  2179  			Cause:    err,
  2180  		})
  2181  		return nil // return nil from process* funcs to keep conn alive
  2182  	}
  2183  	if res == nil {
  2184  		// (nil, nil) special case. See handleResponse docs.
  2185  		return nil
  2186  	}
  2187  	cs.res = res
  2188  	close(cs.respHeaderRecv)
  2189  	if f.StreamEnded() {
  2190  		rl.endStream(cs)
  2191  	}
  2192  	return nil
  2193  }
  2194  
  2195  // may return error types nil, or ConnectionError. Any other error value
  2196  // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2197  // is the detail.
  2198  //
  2199  // As a special case, handleResponse may return (nil, nil) to skip the
  2200  // frame (currently only used for 1xx responses).
  2201  func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
  2202  	if f.Truncated {
  2203  		return nil, errResponseHeaderListSize
  2204  	}
  2205  
  2206  	status := f.PseudoValue("status")
  2207  	if status == "" {
  2208  		return nil, errors.New("malformed response from server: missing status pseudo header")
  2209  	}
  2210  	statusCode, err := strconv.Atoi(status)
  2211  	if err != nil {
  2212  		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2213  	}
  2214  
  2215  	regularFields := f.RegularFields()
  2216  	strs := make([]string, len(regularFields))
  2217  	header := make(Header, len(regularFields))
  2218  	res := &cs.staticResp
  2219  	cs.staticResp = ClientResponse{
  2220  		Header:     header,
  2221  		StatusCode: statusCode,
  2222  		Status:     status,
  2223  	}
  2224  	for _, hf := range regularFields {
  2225  		key := httpcommon.CanonicalHeader(hf.Name)
  2226  		if key == "Trailer" {
  2227  			t := res.Trailer
  2228  			if t == nil {
  2229  				t = make(Header)
  2230  				res.Trailer = t
  2231  			}
  2232  			foreachHeaderElement(hf.Value, func(v string) {
  2233  				t[httpcommon.CanonicalHeader(v)] = nil
  2234  			})
  2235  		} else {
  2236  			vv := header[key]
  2237  			if vv == nil && len(strs) > 0 {
  2238  				// More than likely this will be a single-element key.
  2239  				// Most headers aren't multi-valued.
  2240  				// Set the capacity on strs[0] to 1, so any future append
  2241  				// won't extend the slice into the other strings.
  2242  				vv, strs = strs[:1:1], strs[1:]
  2243  				vv[0] = hf.Value
  2244  				header[key] = vv
  2245  			} else {
  2246  				header[key] = append(vv, hf.Value)
  2247  			}
  2248  		}
  2249  	}
  2250  
  2251  	if statusCode >= 100 && statusCode <= 199 {
  2252  		if f.StreamEnded() {
  2253  			return nil, errors.New("1xx informational response with END_STREAM flag")
  2254  		}
  2255  		if fn := cs.get1xxTraceFunc(); fn != nil {
  2256  			// If the 1xx response is being delivered to the user,
  2257  			// then they're responsible for limiting the number
  2258  			// of responses.
  2259  			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2260  				return nil, err
  2261  			}
  2262  		} else {
  2263  			// If the user didn't examine the 1xx response, then we
  2264  			// limit the size of all 1xx headers.
  2265  			//
  2266  			// This differs a bit from the HTTP/1 implementation, which
  2267  			// limits the size of all 1xx headers plus the final response.
  2268  			// Use the larger limit of MaxHeaderListSize and
  2269  			// net/http.Transport.MaxResponseHeaderBytes.
  2270  			limit := int64(cs.cc.t.maxHeaderListSize())
  2271  			if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
  2272  				limit = t1.MaxResponseHeaderBytes()
  2273  			}
  2274  			for _, h := range f.Fields {
  2275  				cs.totalHeaderSize += int64(h.Size())
  2276  			}
  2277  			if cs.totalHeaderSize > limit {
  2278  				if VerboseLogs {
  2279  					log.Printf("http2: 1xx informational responses too large")
  2280  				}
  2281  				return nil, errors.New("header list too large")
  2282  			}
  2283  		}
  2284  		if statusCode == 100 {
  2285  			traceGot100Continue(cs.trace)
  2286  			select {
  2287  			case cs.on100 <- struct{}{}:
  2288  			default:
  2289  			}
  2290  		}
  2291  		cs.pastHeaders = false // do it all again
  2292  		return nil, nil
  2293  	}
  2294  
  2295  	res.ContentLength = -1
  2296  	if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2297  		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2298  			res.ContentLength = int64(cl)
  2299  		} else {
  2300  			// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2301  			// more safe smuggling-wise to ignore.
  2302  		}
  2303  	} else if len(clens) > 1 {
  2304  		// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2305  		// more safe smuggling-wise to ignore.
  2306  	} else if f.StreamEnded() && !cs.isHead {
  2307  		res.ContentLength = 0
  2308  	}
  2309  
  2310  	if cs.isHead {
  2311  		res.Body = NoBody
  2312  		return res, nil
  2313  	}
  2314  
  2315  	if f.StreamEnded() {
  2316  		if res.ContentLength > 0 {
  2317  			res.Body = missingBody{}
  2318  		} else {
  2319  			res.Body = NoBody
  2320  		}
  2321  		return res, nil
  2322  	}
  2323  
  2324  	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2325  	cs.bytesRemain = res.ContentLength
  2326  	res.Body = transportResponseBody{cs}
  2327  
  2328  	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2329  		res.Header.Del("Content-Encoding")
  2330  		res.Header.Del("Content-Length")
  2331  		res.ContentLength = -1
  2332  		res.Body = &gzipReader{body: res.Body}
  2333  		res.Uncompressed = true
  2334  	}
  2335  	return res, nil
  2336  }
  2337  
  2338  func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2339  	if cs.pastTrailers {
  2340  		// Too many HEADERS frames for this stream.
  2341  		return ConnectionError(ErrCodeProtocol)
  2342  	}
  2343  	cs.pastTrailers = true
  2344  	if !f.StreamEnded() {
  2345  		// We expect that any headers for trailers also
  2346  		// has END_STREAM.
  2347  		return ConnectionError(ErrCodeProtocol)
  2348  	}
  2349  	if len(f.PseudoFields()) > 0 {
  2350  		// No pseudo header fields are defined for trailers.
  2351  		// TODO: ConnectionError might be overly harsh? Check.
  2352  		return ConnectionError(ErrCodeProtocol)
  2353  	}
  2354  
  2355  	trailer := make(Header)
  2356  	for _, hf := range f.RegularFields() {
  2357  		key := httpcommon.CanonicalHeader(hf.Name)
  2358  		trailer[key] = append(trailer[key], hf.Value)
  2359  	}
  2360  	cs.trailer = trailer
  2361  
  2362  	rl.endStream(cs)
  2363  	return nil
  2364  }
  2365  
  2366  // transportResponseBody is the concrete type of Transport.RoundTrip's
  2367  // Response.Body. It is an io.ReadCloser.
  2368  type transportResponseBody struct {
  2369  	cs *clientStream
  2370  }
  2371  
  2372  func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2373  	cs := b.cs
  2374  	cc := cs.cc
  2375  
  2376  	if cs.readErr != nil {
  2377  		return 0, cs.readErr
  2378  	}
  2379  	n, err = b.cs.bufPipe.Read(p)
  2380  	if cs.bytesRemain != -1 {
  2381  		if int64(n) > cs.bytesRemain {
  2382  			n = int(cs.bytesRemain)
  2383  			if err == nil {
  2384  				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2385  				cs.abortStream(err)
  2386  			}
  2387  			cs.readErr = err
  2388  			return int(cs.bytesRemain), err
  2389  		}
  2390  		cs.bytesRemain -= int64(n)
  2391  		if err == io.EOF && cs.bytesRemain > 0 {
  2392  			err = io.ErrUnexpectedEOF
  2393  			cs.readErr = err
  2394  			return n, err
  2395  		}
  2396  	}
  2397  	if n == 0 {
  2398  		// No flow control tokens to send back.
  2399  		return
  2400  	}
  2401  
  2402  	cc.mu.Lock()
  2403  	connAdd := cc.inflow.add(n)
  2404  	var streamAdd int32
  2405  	if err == nil { // No need to refresh if the stream is over or failed.
  2406  		streamAdd = cs.inflow.add(n)
  2407  	}
  2408  	cc.mu.Unlock()
  2409  
  2410  	if connAdd != 0 || streamAdd != 0 {
  2411  		cc.wmu.Lock()
  2412  		defer cc.wmu.Unlock()
  2413  		if connAdd != 0 {
  2414  			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2415  		}
  2416  		if streamAdd != 0 {
  2417  			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2418  		}
  2419  		cc.bw.Flush()
  2420  	}
  2421  	return
  2422  }
  2423  
  2424  var errClosedResponseBody = errors.New("http2: response body closed")
  2425  
  2426  func (b transportResponseBody) Close() error {
  2427  	cs := b.cs
  2428  	cc := cs.cc
  2429  
  2430  	cs.bufPipe.BreakWithError(errClosedResponseBody)
  2431  	cs.abortStream(errClosedResponseBody)
  2432  
  2433  	unread := cs.bufPipe.Len()
  2434  	if unread > 0 {
  2435  		cc.mu.Lock()
  2436  		// Return connection-level flow control.
  2437  		connAdd := cc.inflow.add(unread)
  2438  		cc.mu.Unlock()
  2439  
  2440  		// TODO(dneil): Acquiring this mutex can block indefinitely.
  2441  		// Move flow control return to a goroutine?
  2442  		cc.wmu.Lock()
  2443  		// Return connection-level flow control.
  2444  		if connAdd > 0 {
  2445  			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2446  		}
  2447  		cc.bw.Flush()
  2448  		cc.wmu.Unlock()
  2449  	}
  2450  
  2451  	select {
  2452  	case <-cs.donec:
  2453  	case <-cs.ctx.Done():
  2454  		// See golang/go#49366: The net/http package can cancel the
  2455  		// request context after the response body is fully read.
  2456  		// Don't treat this as an error.
  2457  		return nil
  2458  	case <-cs.reqCancel:
  2459  		return errRequestCanceled
  2460  	}
  2461  	return nil
  2462  }
  2463  
  2464  func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2465  	cc := rl.cc
  2466  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2467  	data := f.Data()
  2468  	if cs == nil {
  2469  		cc.mu.Lock()
  2470  		neverSent := cc.nextStreamID
  2471  		cc.mu.Unlock()
  2472  		if f.StreamID >= neverSent {
  2473  			// We never asked for this.
  2474  			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2475  			return ConnectionError(ErrCodeProtocol)
  2476  		}
  2477  		// We probably did ask for this, but canceled. Just ignore it.
  2478  		// TODO: be stricter here? only silently ignore things which
  2479  		// we canceled, but not things which were closed normally
  2480  		// by the peer? Tough without accumulating too much state.
  2481  
  2482  		// But at least return their flow control:
  2483  		if f.Length > 0 {
  2484  			cc.mu.Lock()
  2485  			ok := cc.inflow.take(f.Length)
  2486  			connAdd := cc.inflow.add(int(f.Length))
  2487  			cc.mu.Unlock()
  2488  			if !ok {
  2489  				return ConnectionError(ErrCodeFlowControl)
  2490  			}
  2491  			if connAdd > 0 {
  2492  				cc.wmu.Lock()
  2493  				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2494  				cc.bw.Flush()
  2495  				cc.wmu.Unlock()
  2496  			}
  2497  		}
  2498  		return nil
  2499  	}
  2500  	if cs.readClosed {
  2501  		cc.logf("protocol error: received DATA after END_STREAM")
  2502  		rl.endStreamError(cs, StreamError{
  2503  			StreamID: f.StreamID,
  2504  			Code:     ErrCodeProtocol,
  2505  		})
  2506  		return nil
  2507  	}
  2508  	if !cs.pastHeaders {
  2509  		cc.logf("protocol error: received DATA before a HEADERS frame")
  2510  		rl.endStreamError(cs, StreamError{
  2511  			StreamID: f.StreamID,
  2512  			Code:     ErrCodeProtocol,
  2513  		})
  2514  		return nil
  2515  	}
  2516  	if f.Length > 0 {
  2517  		if cs.isHead && len(data) > 0 {
  2518  			cc.logf("protocol error: received DATA on a HEAD request")
  2519  			rl.endStreamError(cs, StreamError{
  2520  				StreamID: f.StreamID,
  2521  				Code:     ErrCodeProtocol,
  2522  			})
  2523  			return nil
  2524  		}
  2525  		// Check connection-level flow control.
  2526  		cc.mu.Lock()
  2527  		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2528  			cc.mu.Unlock()
  2529  			return ConnectionError(ErrCodeFlowControl)
  2530  		}
  2531  		// Return any padded flow control now, since we won't
  2532  		// refund it later on body reads.
  2533  		var refund int
  2534  		if pad := int(f.Length) - len(data); pad > 0 {
  2535  			refund += pad
  2536  		}
  2537  
  2538  		didReset := false
  2539  		var err error
  2540  		if len(data) > 0 {
  2541  			if _, err = cs.bufPipe.Write(data); err != nil {
  2542  				// Return len(data) now if the stream is already closed,
  2543  				// since data will never be read.
  2544  				didReset = true
  2545  				refund += len(data)
  2546  			}
  2547  		}
  2548  
  2549  		sendConn := cc.inflow.add(refund)
  2550  		var sendStream int32
  2551  		if !didReset {
  2552  			sendStream = cs.inflow.add(refund)
  2553  		}
  2554  		cc.mu.Unlock()
  2555  
  2556  		if sendConn > 0 || sendStream > 0 {
  2557  			cc.wmu.Lock()
  2558  			if sendConn > 0 {
  2559  				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2560  			}
  2561  			if sendStream > 0 {
  2562  				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2563  			}
  2564  			cc.bw.Flush()
  2565  			cc.wmu.Unlock()
  2566  		}
  2567  
  2568  		if err != nil {
  2569  			rl.endStreamError(cs, err)
  2570  			return nil
  2571  		}
  2572  	}
  2573  
  2574  	if f.StreamEnded() {
  2575  		rl.endStream(cs)
  2576  	}
  2577  	return nil
  2578  }
  2579  
  2580  func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  2581  	// TODO: check that any declared content-length matches, like
  2582  	// server.go's (*stream).endStream method.
  2583  	if !cs.readClosed {
  2584  		cs.readClosed = true
  2585  		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  2586  		// race condition: The caller can read io.EOF from Response.Body
  2587  		// and close the body before we close cs.peerClosed, causing
  2588  		// cleanupWriteRequest to send a RST_STREAM.
  2589  		rl.cc.mu.Lock()
  2590  		defer rl.cc.mu.Unlock()
  2591  		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  2592  		close(cs.peerClosed)
  2593  	}
  2594  }
  2595  
  2596  func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  2597  	cs.readAborted = true
  2598  	cs.abortStream(err)
  2599  }
  2600  
  2601  func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
  2602  	cs.readAborted = true
  2603  	cs.abortStreamLocked(err)
  2604  }
  2605  
  2606  // Constants passed to streamByID for documentation purposes.
  2607  const (
  2608  	headerOrDataFrame    = true
  2609  	notHeaderOrDataFrame = false
  2610  )
  2611  
  2612  // streamByID returns the stream with the given id, or nil if no stream has that id.
  2613  // If headerOrData is true, it clears rst.StreamPingsBlocked.
  2614  func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  2615  	rl.cc.mu.Lock()
  2616  	defer rl.cc.mu.Unlock()
  2617  	if headerOrData {
  2618  		// Work around an unfortunate gRPC behavior.
  2619  		// See comment on ClientConn.rstStreamPingsBlocked for details.
  2620  		rl.cc.rstStreamPingsBlocked = false
  2621  	}
  2622  	rl.cc.readBeforeStreamID = rl.cc.nextStreamID
  2623  	cs := rl.cc.streams[id]
  2624  	if cs != nil && !cs.readAborted {
  2625  		return cs
  2626  	}
  2627  	return nil
  2628  }
  2629  
  2630  func (cs *clientStream) copyTrailers() {
  2631  	for k, vv := range cs.trailer {
  2632  		t := cs.resTrailer
  2633  		if *t == nil {
  2634  			*t = make(Header)
  2635  		}
  2636  		(*t)[k] = vv
  2637  	}
  2638  }
  2639  
  2640  func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  2641  	cc := rl.cc
  2642  	cc.t.connPool.MarkDead(cc)
  2643  	if f.ErrCode != 0 {
  2644  		// TODO: deal with GOAWAY more. particularly the error code
  2645  		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  2646  		if fn := cc.fr.countError; fn != nil {
  2647  			fn("recv_goaway_" + f.ErrCode.stringToken())
  2648  		}
  2649  	}
  2650  	cc.setGoAway(f)
  2651  	return nil
  2652  }
  2653  
  2654  func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  2655  	cc := rl.cc
  2656  	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  2657  	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  2658  	cc.wmu.Lock()
  2659  	defer cc.wmu.Unlock()
  2660  
  2661  	if err := rl.processSettingsNoWrite(f); err != nil {
  2662  		return err
  2663  	}
  2664  	if !f.IsAck() {
  2665  		cc.fr.WriteSettingsAck()
  2666  		cc.bw.Flush()
  2667  	}
  2668  	return nil
  2669  }
  2670  
  2671  func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  2672  	cc := rl.cc
  2673  	defer cc.maybeCallStateHook()
  2674  	cc.mu.Lock()
  2675  	defer cc.mu.Unlock()
  2676  
  2677  	if f.IsAck() {
  2678  		if cc.wantSettingsAck {
  2679  			cc.wantSettingsAck = false
  2680  			return nil
  2681  		}
  2682  		return ConnectionError(ErrCodeProtocol)
  2683  	}
  2684  
  2685  	var seenMaxConcurrentStreams bool
  2686  	err := f.ForeachSetting(func(s Setting) error {
  2687  		if err := s.Valid(); err != nil {
  2688  			return err
  2689  		}
  2690  		switch s.ID {
  2691  		case SettingMaxFrameSize:
  2692  			cc.maxFrameSize = s.Val
  2693  		case SettingMaxConcurrentStreams:
  2694  			cc.maxConcurrentStreams = s.Val
  2695  			seenMaxConcurrentStreams = true
  2696  		case SettingMaxHeaderListSize:
  2697  			cc.peerMaxHeaderListSize = uint64(s.Val)
  2698  		case SettingInitialWindowSize:
  2699  			// Values above the maximum flow-control
  2700  			// window size of 2^31-1 MUST be treated as a
  2701  			// connection error (Section 5.4.1) of type
  2702  			// FLOW_CONTROL_ERROR.
  2703  			if s.Val > math.MaxInt32 {
  2704  				return ConnectionError(ErrCodeFlowControl)
  2705  			}
  2706  
  2707  			// Adjust flow control of currently-open
  2708  			// frames by the difference of the old initial
  2709  			// window size and this one.
  2710  			delta := int32(s.Val) - int32(cc.initialWindowSize)
  2711  			for _, cs := range cc.streams {
  2712  				cs.flow.add(delta)
  2713  			}
  2714  			cc.cond.Broadcast()
  2715  
  2716  			cc.initialWindowSize = s.Val
  2717  		case SettingHeaderTableSize:
  2718  			cc.henc.SetMaxDynamicTableSize(s.Val)
  2719  			cc.peerMaxHeaderTableSize = s.Val
  2720  		case SettingEnableConnectProtocol:
  2721  			// If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  2722  			// we require that it do so in the first SETTINGS frame.
  2723  			//
  2724  			// When we attempt to use extended CONNECT, we wait for the first
  2725  			// SETTINGS frame to see if the server supports it. If we let the
  2726  			// server enable the feature with a later SETTINGS frame, then
  2727  			// users will see inconsistent results depending on whether we've
  2728  			// seen that frame or not.
  2729  			if !cc.seenSettings {
  2730  				cc.extendedConnectAllowed = s.Val == 1
  2731  			}
  2732  		default:
  2733  			cc.vlogf("Unhandled Setting: %v", s)
  2734  		}
  2735  		return nil
  2736  	})
  2737  	if err != nil {
  2738  		return err
  2739  	}
  2740  
  2741  	if !cc.seenSettings {
  2742  		if !seenMaxConcurrentStreams {
  2743  			// This was the servers initial SETTINGS frame and it
  2744  			// didn't contain a MAX_CONCURRENT_STREAMS field so
  2745  			// increase the number of concurrent streams this
  2746  			// connection can establish to our default.
  2747  			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  2748  		}
  2749  		close(cc.seenSettingsChan)
  2750  		cc.seenSettings = true
  2751  	}
  2752  
  2753  	return nil
  2754  }
  2755  
  2756  func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  2757  	cc := rl.cc
  2758  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2759  	if f.StreamID != 0 && cs == nil {
  2760  		return nil
  2761  	}
  2762  
  2763  	cc.mu.Lock()
  2764  	defer cc.mu.Unlock()
  2765  
  2766  	fl := &cc.flow
  2767  	if cs != nil {
  2768  		fl = &cs.flow
  2769  	}
  2770  	if !fl.add(int32(f.Increment)) {
  2771  		// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  2772  		if cs != nil {
  2773  			rl.endStreamErrorLocked(cs, StreamError{
  2774  				StreamID: f.StreamID,
  2775  				Code:     ErrCodeFlowControl,
  2776  			})
  2777  			return nil
  2778  		}
  2779  
  2780  		return ConnectionError(ErrCodeFlowControl)
  2781  	}
  2782  	cc.cond.Broadcast()
  2783  	return nil
  2784  }
  2785  
  2786  func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  2787  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2788  	if cs == nil {
  2789  		// TODO: return error if server tries to RST_STREAM an idle stream
  2790  		return nil
  2791  	}
  2792  	serr := streamError(cs.ID, f.ErrCode)
  2793  	serr.Cause = errFromPeer
  2794  	if f.ErrCode == ErrCodeProtocol {
  2795  		rl.cc.SetDoNotReuse()
  2796  	}
  2797  	if fn := cs.cc.fr.countError; fn != nil {
  2798  		fn("recv_rststream_" + f.ErrCode.stringToken())
  2799  	}
  2800  	cs.abortStream(serr)
  2801  
  2802  	cs.bufPipe.CloseWithError(serr)
  2803  	return nil
  2804  }
  2805  
  2806  // Ping sends a PING frame to the server and waits for the ack.
  2807  func (cc *ClientConn) Ping(ctx context.Context) error {
  2808  	c := make(chan struct{})
  2809  	// Generate a random payload
  2810  	var p [8]byte
  2811  	for {
  2812  		if _, err := rand.Read(p[:]); err != nil {
  2813  			return err
  2814  		}
  2815  		cc.mu.Lock()
  2816  		// check for dup before insert
  2817  		if _, found := cc.pings[p]; !found {
  2818  			cc.pings[p] = c
  2819  			cc.mu.Unlock()
  2820  			break
  2821  		}
  2822  		cc.mu.Unlock()
  2823  	}
  2824  	var pingError error
  2825  	errc := make(chan struct{})
  2826  	go func() {
  2827  		cc.wmu.Lock()
  2828  		defer cc.wmu.Unlock()
  2829  		if pingError = cc.fr.WritePing(false, p); pingError != nil {
  2830  			close(errc)
  2831  			return
  2832  		}
  2833  		if pingError = cc.bw.Flush(); pingError != nil {
  2834  			close(errc)
  2835  			return
  2836  		}
  2837  	}()
  2838  	select {
  2839  	case <-c:
  2840  		return nil
  2841  	case <-errc:
  2842  		return pingError
  2843  	case <-ctx.Done():
  2844  		return ctx.Err()
  2845  	case <-cc.readerDone:
  2846  		// connection closed
  2847  		return cc.readerErr
  2848  	}
  2849  }
  2850  
  2851  func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  2852  	if f.IsAck() {
  2853  		cc := rl.cc
  2854  		defer cc.maybeCallStateHook()
  2855  		cc.mu.Lock()
  2856  		defer cc.mu.Unlock()
  2857  		// If ack, notify listener if any
  2858  		if c, ok := cc.pings[f.Data]; ok {
  2859  			close(c)
  2860  			delete(cc.pings, f.Data)
  2861  		}
  2862  		if cc.pendingResets > 0 {
  2863  			// See clientStream.cleanupWriteRequest.
  2864  			cc.pendingResets = 0
  2865  			cc.rstStreamPingsBlocked = true
  2866  			cc.cond.Broadcast()
  2867  		}
  2868  		return nil
  2869  	}
  2870  	cc := rl.cc
  2871  	cc.wmu.Lock()
  2872  	defer cc.wmu.Unlock()
  2873  	if err := cc.fr.WritePing(true, f.Data); err != nil {
  2874  		return err
  2875  	}
  2876  	return cc.bw.Flush()
  2877  }
  2878  
  2879  func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  2880  	// We told the peer we don't want them.
  2881  	// Spec says:
  2882  	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  2883  	// setting of the peer endpoint is set to 0. An endpoint that
  2884  	// has set this setting and has received acknowledgement MUST
  2885  	// treat the receipt of a PUSH_PROMISE frame as a connection
  2886  	// error (Section 5.4.1) of type PROTOCOL_ERROR."
  2887  	return ConnectionError(ErrCodeProtocol)
  2888  }
  2889  
  2890  // writeStreamReset sends a RST_STREAM frame.
  2891  // When ping is true, it also sends a PING frame with a random payload.
  2892  func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  2893  	// TODO: map err to more interesting error codes, once the
  2894  	// HTTP community comes up with some. But currently for
  2895  	// RST_STREAM there's no equivalent to GOAWAY frame's debug
  2896  	// data, and the error codes are all pretty vague ("cancel").
  2897  	cc.wmu.Lock()
  2898  	cc.fr.WriteRSTStream(streamID, code)
  2899  	if ping {
  2900  		var payload [8]byte
  2901  		rand.Read(payload[:])
  2902  		cc.fr.WritePing(false, payload)
  2903  	}
  2904  	cc.bw.Flush()
  2905  	cc.wmu.Unlock()
  2906  }
  2907  
  2908  var (
  2909  	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  2910  	errRequestHeaderListSize  = httpcommon.ErrRequestHeaderListSize
  2911  )
  2912  
  2913  func (cc *ClientConn) logf(format string, args ...any) {
  2914  	cc.t.logf(format, args...)
  2915  }
  2916  
  2917  func (cc *ClientConn) vlogf(format string, args ...any) {
  2918  	cc.t.vlogf(format, args...)
  2919  }
  2920  
  2921  func (t *Transport) vlogf(format string, args ...any) {
  2922  	if VerboseLogs {
  2923  		t.logf(format, args...)
  2924  	}
  2925  }
  2926  
  2927  func (t *Transport) logf(format string, args ...any) {
  2928  	log.Printf(format, args...)
  2929  }
  2930  
  2931  type missingBody struct{}
  2932  
  2933  func (missingBody) Close() error             { return nil }
  2934  func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  2935  
  2936  type erringRoundTripper struct{ err error }
  2937  
  2938  func (rt erringRoundTripper) RoundTripErr() error                               { return rt.err }
  2939  func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
  2940  
  2941  var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
  2942  
  2943  // gzipReader wraps a response body so it can lazily
  2944  // get gzip.Reader from the pool on the first call to Read.
  2945  // After Close is called it puts gzip.Reader to the pool immediately
  2946  // if there is no Read in progress or later when Read completes.
  2947  type gzipReader struct {
  2948  	_    incomparable
  2949  	body io.ReadCloser // underlying Response.Body
  2950  	mu   sync.Mutex    // guards zr and zerr
  2951  	zr   *gzip.Reader  // stores gzip reader from the pool between reads
  2952  	zerr error         // sticky gzip reader init error or sentinel value to detect concurrent read and read after close
  2953  }
  2954  
  2955  type eofReader struct{}
  2956  
  2957  func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
  2958  func (eofReader) ReadByte() (byte, error)  { return 0, io.EOF }
  2959  
  2960  var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
  2961  
  2962  // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
  2963  func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
  2964  	zr := gzipPool.Get().(*gzip.Reader)
  2965  	if err := zr.Reset(r); err != nil {
  2966  		gzipPoolPut(zr)
  2967  		return nil, err
  2968  	}
  2969  	return zr, nil
  2970  }
  2971  
  2972  // gzipPoolPut puts a gzip.Reader back into the pool.
  2973  func gzipPoolPut(zr *gzip.Reader) {
  2974  	// Reset will allocate bufio.Reader if we pass it anything
  2975  	// other than a flate.Reader, so ensure that it's getting one.
  2976  	var r flate.Reader = eofReader{}
  2977  	zr.Reset(r)
  2978  	gzipPool.Put(zr)
  2979  }
  2980  
  2981  // acquire returns a gzip.Reader for reading response body.
  2982  // The reader must be released after use.
  2983  func (gz *gzipReader) acquire() (*gzip.Reader, error) {
  2984  	gz.mu.Lock()
  2985  	defer gz.mu.Unlock()
  2986  	if gz.zerr != nil {
  2987  		return nil, gz.zerr
  2988  	}
  2989  	if gz.zr == nil {
  2990  		gz.zr, gz.zerr = gzipPoolGet(gz.body)
  2991  		if gz.zerr != nil {
  2992  			return nil, gz.zerr
  2993  		}
  2994  	}
  2995  	ret := gz.zr
  2996  	gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
  2997  	return ret, nil
  2998  }
  2999  
  3000  // release returns the gzip.Reader to the pool if Close was called during Read.
  3001  func (gz *gzipReader) release(zr *gzip.Reader) {
  3002  	gz.mu.Lock()
  3003  	defer gz.mu.Unlock()
  3004  	if gz.zerr == errConcurrentReadOnResBody {
  3005  		gz.zr, gz.zerr = zr, nil
  3006  	} else { // fs.ErrClosed
  3007  		gzipPoolPut(zr)
  3008  	}
  3009  }
  3010  
  3011  // close returns the gzip.Reader to the pool immediately or
  3012  // signals release to do so after Read completes.
  3013  func (gz *gzipReader) close() {
  3014  	gz.mu.Lock()
  3015  	defer gz.mu.Unlock()
  3016  	if gz.zerr == nil && gz.zr != nil {
  3017  		gzipPoolPut(gz.zr)
  3018  		gz.zr = nil
  3019  	}
  3020  	gz.zerr = fs.ErrClosed
  3021  }
  3022  
  3023  func (gz *gzipReader) Read(p []byte) (n int, err error) {
  3024  	zr, err := gz.acquire()
  3025  	if err != nil {
  3026  		return 0, err
  3027  	}
  3028  	defer gz.release(zr)
  3029  
  3030  	return zr.Read(p)
  3031  }
  3032  
  3033  func (gz *gzipReader) Close() error {
  3034  	gz.close()
  3035  
  3036  	return gz.body.Close()
  3037  }
  3038  
  3039  // isConnectionCloseRequest reports whether req should use its own
  3040  // connection for a single request and then close the connection.
  3041  func isConnectionCloseRequest(req *ClientRequest) bool {
  3042  	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  3043  }
  3044  
  3045  // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
  3046  // the RoundTripper returned by NewClientConn.
  3047  type NetHTTPClientConn struct {
  3048  	cc *ClientConn
  3049  }
  3050  
  3051  func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
  3052  	return cc.cc.RoundTrip(req)
  3053  }
  3054  
  3055  func (cc NetHTTPClientConn) Close() error {
  3056  	return cc.cc.Close()
  3057  }
  3058  
  3059  func (cc NetHTTPClientConn) Err() error {
  3060  	cc.cc.mu.Lock()
  3061  	defer cc.cc.mu.Unlock()
  3062  	if cc.cc.closed {
  3063  		return errors.New("connection closed")
  3064  	}
  3065  	return nil
  3066  }
  3067  
  3068  func (cc NetHTTPClientConn) Reserve() error {
  3069  	defer cc.cc.maybeCallStateHook()
  3070  	cc.cc.mu.Lock()
  3071  	defer cc.cc.mu.Unlock()
  3072  	if !cc.cc.canReserveLocked() {
  3073  		return errors.New("connection is unavailable")
  3074  	}
  3075  	cc.cc.streamsReserved++
  3076  	return nil
  3077  }
  3078  
  3079  func (cc NetHTTPClientConn) Release() {
  3080  	defer cc.cc.maybeCallStateHook()
  3081  	cc.cc.mu.Lock()
  3082  	defer cc.cc.mu.Unlock()
  3083  	// We don't complain if streamsReserved is 0.
  3084  	//
  3085  	// This is consistent with RoundTrip: both Release and RoundTrip will
  3086  	// consume a reservation iff one exists.
  3087  	if cc.cc.streamsReserved > 0 {
  3088  		cc.cc.streamsReserved--
  3089  	}
  3090  }
  3091  
  3092  func (cc NetHTTPClientConn) Available() int {
  3093  	cc.cc.mu.Lock()
  3094  	defer cc.cc.mu.Unlock()
  3095  	return cc.cc.availableLocked()
  3096  }
  3097  
  3098  func (cc NetHTTPClientConn) InFlight() int {
  3099  	cc.cc.mu.Lock()
  3100  	defer cc.cc.mu.Unlock()
  3101  	return cc.cc.currentRequestCountLocked()
  3102  }
  3103  
  3104  func (cc NetHTTPClientConn) Ping(ctx context.Context) error {
  3105  	return cc.cc.Ping(ctx)
  3106  }
  3107  
  3108  func (cc *ClientConn) maybeCallStateHook() {
  3109  	if cc.internalStateHook != nil {
  3110  		cc.internalStateHook()
  3111  	}
  3112  }
  3113  
  3114  func (t *Transport) idleConnTimeout() time.Duration {
  3115  	if t.t1 != nil {
  3116  		return t.t1.IdleConnTimeout()
  3117  	}
  3118  
  3119  	return 0
  3120  }
  3121  
  3122  func traceGetConn(req *ClientRequest, hostPort string) {
  3123  	trace := httptrace.ContextClientTrace(req.Context)
  3124  	if trace == nil || trace.GetConn == nil {
  3125  		return
  3126  	}
  3127  	trace.GetConn(hostPort)
  3128  }
  3129  
  3130  func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
  3131  	trace := httptrace.ContextClientTrace(req.Context)
  3132  	if trace == nil || trace.GotConn == nil {
  3133  		return
  3134  	}
  3135  	ci := httptrace.GotConnInfo{Conn: cc.tconn}
  3136  	ci.Reused = reused
  3137  	cc.mu.Lock()
  3138  	ci.WasIdle = len(cc.streams) == 0 && reused
  3139  	if ci.WasIdle && !cc.lastActive.IsZero() {
  3140  		ci.IdleTime = time.Since(cc.lastActive)
  3141  	}
  3142  	cc.mu.Unlock()
  3143  
  3144  	trace.GotConn(ci)
  3145  }
  3146  
  3147  func traceWroteHeaders(trace *httptrace.ClientTrace) {
  3148  	if trace != nil && trace.WroteHeaders != nil {
  3149  		trace.WroteHeaders()
  3150  	}
  3151  }
  3152  
  3153  func traceGot100Continue(trace *httptrace.ClientTrace) {
  3154  	if trace != nil && trace.Got100Continue != nil {
  3155  		trace.Got100Continue()
  3156  	}
  3157  }
  3158  
  3159  func traceWait100Continue(trace *httptrace.ClientTrace) {
  3160  	if trace != nil && trace.Wait100Continue != nil {
  3161  		trace.Wait100Continue()
  3162  	}
  3163  }
  3164  
  3165  func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  3166  	if trace != nil && trace.WroteRequest != nil {
  3167  		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  3168  	}
  3169  }
  3170  
  3171  func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  3172  	if trace != nil && trace.GotFirstResponseByte != nil {
  3173  		trace.GotFirstResponseByte()
  3174  	}
  3175  }
  3176  
  3177  func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  3178  	if trace != nil {
  3179  		return trace.Got1xxResponse
  3180  	}
  3181  	return nil
  3182  }
  3183  
  3184  // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  3185  // connection.
  3186  func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  3187  	dialer := &tls.Dialer{
  3188  		Config: cfg,
  3189  	}
  3190  	cn, err := dialer.DialContext(ctx, network, addr)
  3191  	if err != nil {
  3192  		return nil, err
  3193  	}
  3194  	tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  3195  	return tlsCn, nil
  3196  }
  3197  

View as plain text