1
2
3
4
5
6
7 package http2
8
9 import (
10 "bufio"
11 "bytes"
12 "compress/flate"
13 "compress/gzip"
14 "context"
15 "crypto/rand"
16 "crypto/tls"
17 "errors"
18 "fmt"
19 "io"
20 "io/fs"
21 "log"
22 "math"
23 "math/bits"
24 mathrand "math/rand"
25 "net"
26 "net/http/httptrace"
27 "net/http/internal"
28 "net/http/internal/httpcommon"
29 "net/textproto"
30 "slices"
31 "strconv"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "time"
36
37 "golang.org/x/net/http/httpguts"
38 "golang.org/x/net/http2/hpack"
39 "golang.org/x/net/idna"
40 )
41
42 const (
43
44
45 transportDefaultConnFlow = 1 << 30
46
47
48
49
50 transportDefaultStreamFlow = 4 << 20
51
52 defaultUserAgent = "Go-http-client/2.0"
53
54
55
56
57 initialMaxConcurrentStreams = 100
58
59
60
61 defaultMaxConcurrentStreams = 1000
62 )
63
64
65
66
67
68 type Transport struct {
69 t1 TransportConfig
70 connPool noDialClientConnPool
71 *transportTestHooks
72 }
73
74
75
76
77
78 type transportTestHooks struct {
79 newclientconn func(*ClientConn)
80 }
81
82 func (t *Transport) maxHeaderListSize() uint32 {
83 n := t.t1.MaxHeaderListSize()
84 if b := t.t1.MaxResponseHeaderBytes(); b != 0 {
85 n = b
86 if n > 0 {
87 n = adjustHTTP1MaxHeaderSize(n)
88 }
89 }
90 if n <= 0 {
91 return 10 << 20
92 }
93 if n >= 0xffffffff {
94 return 0
95 }
96 return uint32(n)
97 }
98
99 func (t *Transport) disableCompression() bool {
100 return t.t1 != nil && t.t1.DisableCompression()
101 }
102
103 func NewTransport(t1 TransportConfig) *Transport {
104 connPool := new(clientConnPool)
105 t2 := &Transport{
106 connPool: noDialClientConnPool{connPool},
107 t1: t1,
108 }
109 connPool.t = t2
110 return t2
111 }
112
113 func (t *Transport) AddConn(scheme, authority string, c net.Conn) error {
114 addr := authorityAddr(scheme, authority)
115 used, err := t.connPool.addConnIfNeeded(addr, t, c)
116 if !used {
117 go c.Close()
118 }
119 return err
120 }
121
122
123
124 type unencryptedTransport Transport
125
126 func (t *unencryptedTransport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
127 return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{})
128 }
129
130
131
132 type ClientConn struct {
133 t *Transport
134 tconn net.Conn
135 tlsState *tls.ConnectionState
136 atomicReused uint32
137 singleUse bool
138 getConnCalled bool
139
140
141 readerDone chan struct{}
142 readerErr error
143
144 idleTimeout time.Duration
145 idleTimer *time.Timer
146
147 mu sync.Mutex
148 cond *sync.Cond
149 flow outflow
150 inflow inflow
151 doNotReuse bool
152 closing bool
153 closed bool
154 closedOnIdle bool
155 seenSettings bool
156 seenSettingsChan chan struct{}
157 wantSettingsAck bool
158 goAway *GoAwayFrame
159 goAwayDebug string
160 streams map[uint32]*clientStream
161 streamsReserved int
162 nextStreamID uint32
163 pendingRequests int
164 pings map[[8]byte]chan struct{}
165 br *bufio.Reader
166 lastActive time.Time
167 lastIdle time.Time
168
169 maxFrameSize uint32
170 maxConcurrentStreams uint32
171 peerMaxHeaderListSize uint64
172 peerMaxHeaderTableSize uint32
173 initialWindowSize uint32
174 initialStreamRecvWindowSize int32
175 readIdleTimeout time.Duration
176 pingTimeout time.Duration
177 extendedConnectAllowed bool
178 strictMaxConcurrentStreams bool
179
180
181
182
183
184
185
186
187
188 rstStreamPingsBlocked bool
189
190
191
192
193
194
195
196 pendingResets int
197
198
199
200
201
202
203 readBeforeStreamID uint32
204
205
206
207
208 reqHeaderMu chan struct{}
209
210
211
212
213
214 internalStateHook func()
215
216
217
218
219 wmu sync.Mutex
220 bw *bufio.Writer
221 fr *Framer
222 werr error
223 hbuf bytes.Buffer
224 henc *hpack.Encoder
225 }
226
227
228
229 type clientStream struct {
230 cc *ClientConn
231
232
233 ctx context.Context
234 reqCancel <-chan struct{}
235
236 trace *httptrace.ClientTrace
237 ID uint32
238 bufPipe pipe
239 requestedGzip bool
240 isHead bool
241
242 abortOnce sync.Once
243 abort chan struct{}
244 abortErr error
245
246 peerClosed chan struct{}
247 donec chan struct{}
248 on100 chan struct{}
249
250 respHeaderRecv chan struct{}
251 res *ClientResponse
252
253 flow outflow
254 inflow inflow
255 bytesRemain int64
256 readErr error
257
258 reqBody io.ReadCloser
259 reqBodyContentLength int64
260 reqBodyClosed chan struct{}
261
262
263 sentEndStream bool
264 sentHeaders bool
265
266
267 firstByte bool
268 pastHeaders bool
269 pastTrailers bool
270 readClosed bool
271 readAborted bool
272 totalHeaderSize int64
273
274 trailer Header
275 resTrailer *Header
276
277 staticResp ClientResponse
278 }
279
280 var got1xxFuncForTests func(int, textproto.MIMEHeader) error
281
282
283
284 func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
285 if fn := got1xxFuncForTests; fn != nil {
286 return fn
287 }
288 return traceGot1xxResponseFunc(cs.trace)
289 }
290
291 func (cs *clientStream) abortStream(err error) {
292 cs.cc.mu.Lock()
293 defer cs.cc.mu.Unlock()
294 cs.abortStreamLocked(err)
295 }
296
297 func (cs *clientStream) abortStreamLocked(err error) {
298 cs.abortOnce.Do(func() {
299 cs.abortErr = err
300 close(cs.abort)
301 })
302 if cs.reqBody != nil {
303 cs.closeReqBodyLocked()
304 }
305
306 if cs.cc.cond != nil {
307
308 cs.cc.cond.Broadcast()
309 }
310 }
311
312 func (cs *clientStream) abortRequestBodyWrite() {
313 cc := cs.cc
314 cc.mu.Lock()
315 defer cc.mu.Unlock()
316 if cs.reqBody != nil && cs.reqBodyClosed == nil {
317 cs.closeReqBodyLocked()
318 cc.cond.Broadcast()
319 }
320 }
321
322 func (cs *clientStream) closeReqBodyLocked() {
323 if cs.reqBodyClosed != nil {
324 return
325 }
326 cs.reqBodyClosed = make(chan struct{})
327 reqBodyClosed := cs.reqBodyClosed
328 go func() {
329 cs.reqBody.Close()
330 close(reqBodyClosed)
331 }()
332 }
333
334 type stickyErrWriter struct {
335 conn net.Conn
336 timeout time.Duration
337 err *error
338 }
339
340 func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
341 if *sew.err != nil {
342 return 0, *sew.err
343 }
344 n, err = writeWithByteTimeout(sew.conn, sew.timeout, p)
345 *sew.err = err
346 return n, err
347 }
348
349
350
351
352
353
354
355 type noCachedConnError struct{}
356
357 func (noCachedConnError) IsHTTP2NoCachedConnError() {}
358 func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
359
360
361
362
363 func isNoCachedConnError(err error) bool {
364 _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
365 return ok
366 }
367
368 var ErrNoCachedConn error = noCachedConnError{}
369
370
371 type RoundTripOpt struct {
372
373
374
375
376 OnlyCachedConn bool
377 }
378
379 func (t *Transport) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
380 return t.RoundTripOpt(req, RoundTripOpt{})
381 }
382
383
384
385 func authorityAddr(scheme string, authority string) (addr string) {
386 host, port, err := net.SplitHostPort(authority)
387 if err != nil {
388 host = authority
389 port = ""
390 }
391 if port == "" {
392 port = "443"
393 if scheme == "http" {
394 port = "80"
395 }
396 }
397 if a, err := idna.ToASCII(host); err == nil {
398 host = a
399 }
400
401 if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
402 return host + ":" + port
403 }
404 return net.JoinHostPort(host, port)
405 }
406
407
408 func (t *Transport) RoundTripOpt(req *ClientRequest, opt RoundTripOpt) (*ClientResponse, error) {
409 switch req.URL.Scheme {
410 case "https":
411 case "http":
412 default:
413 return nil, errors.New("http2: unsupported scheme")
414 }
415
416 addr := authorityAddr(req.URL.Scheme, req.URL.Host)
417 for retry := 0; ; retry++ {
418 cc, err := t.connPool.GetClientConn(req, addr)
419 if err != nil {
420 t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
421 return nil, err
422 }
423 reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
424 traceGotConn(req, cc, reused)
425 res, err := cc.RoundTrip(req)
426 if err != nil && retry <= 6 {
427 roundTripErr := err
428 if req, err = shouldRetryRequest(req, err); err == nil {
429
430 if retry == 0 {
431 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
432 continue
433 }
434 backoff := float64(uint(1) << (uint(retry) - 1))
435 backoff += backoff * (0.1 * mathrand.Float64())
436 d := time.Second * time.Duration(backoff)
437 tm := time.NewTimer(d)
438 select {
439 case <-tm.C:
440 t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
441 continue
442 case <-req.Context.Done():
443 tm.Stop()
444 err = req.Context.Err()
445 }
446 }
447 }
448 if err == errClientConnNotEstablished {
449
450
451
452
453
454
455
456
457
458
459 if cc.idleTimer != nil {
460 cc.idleTimer.Stop()
461 }
462 t.connPool.MarkDead(cc)
463 }
464 if err != nil {
465 t.vlogf("RoundTrip failure: %v", err)
466 return nil, err
467 }
468 return res, nil
469 }
470 }
471
472 func (t *Transport) IdleConnStrsForTesting() []string {
473 var ret []string
474 t.connPool.mu.Lock()
475 defer t.connPool.mu.Unlock()
476 for k, ccs := range t.connPool.conns {
477 for _, cc := range ccs {
478 if cc.idleState().canTakeNewRequest {
479 ret = append(ret, k)
480 }
481 }
482 }
483 slices.Sort(ret)
484 return ret
485 }
486
487
488
489
490 func (t *Transport) CloseIdleConnections() {
491 t.connPool.closeIdleConnections()
492 }
493
494 var (
495 errClientConnClosed = errors.New("http2: client conn is closed")
496 errClientConnUnusable = errors.New("http2: client conn not usable")
497 errClientConnNotEstablished = errors.New("http2: client conn could not be established")
498 errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
499 errClientConnForceClosed = errors.New("http2: client connection force closed via ClientConn.Close")
500 )
501
502
503
504
505
506
507 func shouldRetryRequest(req *ClientRequest, err error) (*ClientRequest, error) {
508 if !canRetryError(err) {
509 return nil, err
510 }
511
512 if req.Body == nil || req.Body == NoBody {
513 return req.Clone(), nil
514 }
515
516
517
518 if req.GetBody != nil {
519 body, err := req.GetBody()
520 if err != nil {
521 return nil, err
522 }
523 newReq := req.Clone()
524 newReq.Body = body
525 return newReq, nil
526 }
527
528
529
530 if err == errClientConnUnusable {
531 return req.Clone(), nil
532 }
533
534 return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
535 }
536
537 func canRetryError(err error) bool {
538 if err == errClientConnUnusable || err == errClientConnGotGoAway {
539 return true
540 }
541 if se, ok := err.(StreamError); ok {
542 return se.Code == ErrCodeRefusedStream
543 }
544 return false
545 }
546
547 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
548 if t.transportTestHooks != nil {
549 return t.newClientConn(nil, singleUse, nil)
550 }
551 host, _, err := net.SplitHostPort(addr)
552 if err != nil {
553 return nil, err
554 }
555 tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
556 if err != nil {
557 return nil, err
558 }
559 return t.newClientConn(tconn, singleUse, nil)
560 }
561
562 func (t *Transport) newTLSConfig(host string) *tls.Config {
563 cfg := new(tls.Config)
564 if !slices.Contains(cfg.NextProtos, NextProtoTLS) {
565 cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
566 }
567 if cfg.ServerName == "" {
568 cfg.ServerName = host
569 }
570 return cfg
571 }
572
573 func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
574 tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
575 if err != nil {
576 return nil, err
577 }
578 state := tlsCn.ConnectionState()
579 if p := state.NegotiatedProtocol; p != NextProtoTLS {
580 return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
581 }
582 if !state.NegotiatedProtocolIsMutual {
583 return nil, errors.New("http2: could not negotiate protocol mutually")
584 }
585 return tlsCn, nil
586 }
587
588
589
590 func (t *Transport) disableKeepAlives() bool {
591 return t.t1 != nil && t.t1.DisableKeepAlives()
592 }
593
594 func (t *Transport) expectContinueTimeout() time.Duration {
595 if t.t1 == nil {
596 return 0
597 }
598 return t.t1.ExpectContinueTimeout()
599 }
600
601 func (t *Transport) NewClientConn(c net.Conn, internalStateHook func()) (NetHTTPClientConn, error) {
602 cc, err := t.newClientConn(c, t.disableKeepAlives(), internalStateHook)
603 if err != nil {
604 return NetHTTPClientConn{}, err
605 }
606
607
608
609 cc.strictMaxConcurrentStreams = true
610
611 return NetHTTPClientConn{cc}, nil
612 }
613
614 func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
615 conf := configFromTransport(t)
616 cc := &ClientConn{
617 t: t,
618 tconn: c,
619 readerDone: make(chan struct{}),
620 nextStreamID: 1,
621 maxFrameSize: 16 << 10,
622 initialWindowSize: 65535,
623 initialStreamRecvWindowSize: int32(conf.MaxReceiveBufferPerStream),
624 maxConcurrentStreams: initialMaxConcurrentStreams,
625 strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
626 peerMaxHeaderListSize: 0xffffffffffffffff,
627 streams: make(map[uint32]*clientStream),
628 singleUse: singleUse,
629 seenSettingsChan: make(chan struct{}),
630 wantSettingsAck: true,
631 readIdleTimeout: conf.SendPingTimeout,
632 pingTimeout: conf.PingTimeout,
633 pings: make(map[[8]byte]chan struct{}),
634 reqHeaderMu: make(chan struct{}, 1),
635 lastActive: time.Now(),
636 internalStateHook: internalStateHook,
637 }
638 if t.transportTestHooks != nil {
639 t.transportTestHooks.newclientconn(cc)
640 c = cc.tconn
641 }
642 if VerboseLogs {
643 t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
644 }
645
646 cc.cond = sync.NewCond(&cc.mu)
647 cc.flow.add(int32(initialWindowSize))
648
649
650
651 cc.bw = bufio.NewWriter(stickyErrWriter{
652 conn: c,
653 timeout: conf.WriteByteTimeout,
654 err: &cc.werr,
655 })
656 cc.br = bufio.NewReader(c)
657 cc.fr = NewFramer(cc.bw, cc.br)
658 cc.fr.SetMaxReadFrameSize(uint32(conf.MaxReadFrameSize))
659 if conf.CountError != nil {
660 cc.fr.countError = conf.CountError
661 }
662 maxHeaderTableSize := uint32(conf.MaxDecoderHeaderTableSize)
663 cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
664 cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
665
666 cc.henc = hpack.NewEncoder(&cc.hbuf)
667 cc.henc.SetMaxDynamicTableSizeLimit(uint32(conf.MaxEncoderHeaderTableSize))
668 cc.peerMaxHeaderTableSize = initialHeaderTableSize
669
670 if cs, ok := c.(connectionStater); ok {
671 state := cs.ConnectionState()
672 cc.tlsState = &state
673 }
674
675 initialSettings := []Setting{
676 {ID: SettingEnablePush, Val: 0},
677 {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
678 }
679 initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: uint32(conf.MaxReadFrameSize)})
680 if max := t.maxHeaderListSize(); max != 0 {
681 initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
682 }
683 if maxHeaderTableSize != initialHeaderTableSize {
684 initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
685 }
686
687 cc.bw.Write(clientPreface)
688 cc.fr.WriteSettings(initialSettings...)
689 cc.fr.WriteWindowUpdate(0, uint32(conf.MaxReceiveBufferPerConnection))
690 cc.inflow.init(int32(conf.MaxReceiveBufferPerConnection) + initialWindowSize)
691 cc.bw.Flush()
692 if cc.werr != nil {
693 cc.Close()
694 return nil, cc.werr
695 }
696
697
698 if d := t.idleConnTimeout(); d != 0 {
699 cc.idleTimeout = d
700 cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout)
701 }
702
703 go cc.readLoop()
704 return cc, nil
705 }
706
707 func (cc *ClientConn) healthCheck() {
708 pingTimeout := cc.pingTimeout
709
710
711 ctx, cancel := context.WithTimeout(context.Background(), pingTimeout)
712 defer cancel()
713 cc.vlogf("http2: Transport sending health check")
714 err := cc.Ping(ctx)
715 if err != nil {
716 cc.vlogf("http2: Transport health check failure: %v", err)
717 cc.closeForLostPing()
718 } else {
719 cc.vlogf("http2: Transport health check success")
720 }
721 }
722
723
724 func (cc *ClientConn) SetDoNotReuse() {
725 cc.mu.Lock()
726 defer cc.mu.Unlock()
727 cc.doNotReuse = true
728 }
729
730 func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
731 cc.mu.Lock()
732 defer cc.mu.Unlock()
733
734 old := cc.goAway
735 cc.goAway = f
736
737
738 if cc.goAwayDebug == "" {
739 cc.goAwayDebug = string(f.DebugData())
740 }
741 if old != nil && old.ErrCode != ErrCodeNo {
742 cc.goAway.ErrCode = old.ErrCode
743 }
744 last := f.LastStreamID
745 for streamID, cs := range cc.streams {
746 if streamID <= last {
747
748
749
750 continue
751 }
752 if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
753
754
755
756 cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
757 } else {
758
759
760 cs.abortStreamLocked(errClientConnGotGoAway)
761 }
762 }
763 }
764
765
766
767
768
769
770 func (cc *ClientConn) CanTakeNewRequest() bool {
771 cc.mu.Lock()
772 defer cc.mu.Unlock()
773 return cc.canTakeNewRequestLocked()
774 }
775
776
777
778
779 func (cc *ClientConn) ReserveNewRequest() bool {
780 cc.mu.Lock()
781 defer cc.mu.Unlock()
782 if st := cc.idleStateLocked(); !st.canTakeNewRequest {
783 return false
784 }
785 cc.streamsReserved++
786 return true
787 }
788
789
790 type ClientConnState struct {
791
792 Closed bool
793
794
795
796
797
798 Closing bool
799
800
801 StreamsActive int
802
803
804
805 StreamsReserved int
806
807
808
809
810 StreamsPending int
811
812
813
814
815 MaxConcurrentStreams uint32
816
817
818
819 LastIdle time.Time
820 }
821
822
823 func (cc *ClientConn) State() ClientConnState {
824 cc.wmu.Lock()
825 maxConcurrent := cc.maxConcurrentStreams
826 if !cc.seenSettings {
827 maxConcurrent = 0
828 }
829 cc.wmu.Unlock()
830
831 cc.mu.Lock()
832 defer cc.mu.Unlock()
833 return ClientConnState{
834 Closed: cc.closed,
835 Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
836 StreamsActive: len(cc.streams) + cc.pendingResets,
837 StreamsReserved: cc.streamsReserved,
838 StreamsPending: cc.pendingRequests,
839 LastIdle: cc.lastIdle,
840 MaxConcurrentStreams: maxConcurrent,
841 }
842 }
843
844
845
846 type clientConnIdleState struct {
847 canTakeNewRequest bool
848 }
849
850 func (cc *ClientConn) idleState() clientConnIdleState {
851 cc.mu.Lock()
852 defer cc.mu.Unlock()
853 return cc.idleStateLocked()
854 }
855
856 func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
857 if cc.singleUse && cc.nextStreamID > 1 {
858 return
859 }
860 var maxConcurrentOkay bool
861 if cc.strictMaxConcurrentStreams {
862
863
864
865
866 maxConcurrentOkay = true
867 } else {
868
869
870
871
872
873
874 maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
875 }
876
877 st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
878
879
880
881
882
883
884
885
886 if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
887 st.canTakeNewRequest = true
888 }
889
890 return
891 }
892
893 func (cc *ClientConn) isUsableLocked() bool {
894 return cc.goAway == nil &&
895 !cc.closed &&
896 !cc.closing &&
897 !cc.doNotReuse &&
898 int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
899 !cc.tooIdleLocked()
900 }
901
902
903
904
905
906
907
908 func (cc *ClientConn) canReserveLocked() bool {
909 if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
910 return false
911 }
912 if !cc.isUsableLocked() {
913 return false
914 }
915 return true
916 }
917
918
919
920 func (cc *ClientConn) currentRequestCountLocked() int {
921 return len(cc.streams) + cc.streamsReserved + cc.pendingResets
922 }
923
924 func (cc *ClientConn) canTakeNewRequestLocked() bool {
925 st := cc.idleStateLocked()
926 return st.canTakeNewRequest
927 }
928
929
930 func (cc *ClientConn) availableLocked() int {
931 if !cc.canTakeNewRequestLocked() {
932 return 0
933 }
934 return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
935 }
936
937
938
939 func (cc *ClientConn) tooIdleLocked() bool {
940
941
942
943
944 return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && time.Since(cc.lastIdle.Round(0)) > cc.idleTimeout
945 }
946
947
948
949
950
951
952
953 func (cc *ClientConn) onIdleTimeout() {
954 cc.closeIfIdle()
955 }
956
957 func (cc *ClientConn) closeConn() {
958 t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
959 defer t.Stop()
960 cc.tconn.Close()
961 cc.maybeCallStateHook()
962 }
963
964
965
966 func (cc *ClientConn) forceCloseConn() {
967 tc, ok := cc.tconn.(*tls.Conn)
968 if !ok {
969 return
970 }
971 if nc := tc.NetConn(); nc != nil {
972 nc.Close()
973 }
974 }
975
976 func (cc *ClientConn) closeIfIdle() {
977 cc.mu.Lock()
978 if len(cc.streams) > 0 || cc.streamsReserved > 0 {
979 cc.mu.Unlock()
980 return
981 }
982 cc.closed = true
983 cc.closedOnIdle = true
984 nextID := cc.nextStreamID
985
986 cc.mu.Unlock()
987
988 if VerboseLogs {
989 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
990 }
991 cc.closeConn()
992 }
993
994 func (cc *ClientConn) isDoNotReuseAndIdle() bool {
995 cc.mu.Lock()
996 defer cc.mu.Unlock()
997 return cc.doNotReuse && len(cc.streams) == 0
998 }
999
1000 var shutdownEnterWaitStateHook = func() {}
1001
1002
1003 func (cc *ClientConn) Shutdown(ctx context.Context) error {
1004 if err := cc.sendGoAway(); err != nil {
1005 return err
1006 }
1007
1008 done := make(chan struct{})
1009 cancelled := false
1010 go func() {
1011 cc.mu.Lock()
1012 defer cc.mu.Unlock()
1013 for {
1014 if len(cc.streams) == 0 || cc.closed {
1015 cc.closed = true
1016 close(done)
1017 break
1018 }
1019 if cancelled {
1020 break
1021 }
1022 cc.cond.Wait()
1023 }
1024 }()
1025 shutdownEnterWaitStateHook()
1026 select {
1027 case <-done:
1028 cc.closeConn()
1029 return nil
1030 case <-ctx.Done():
1031 cc.mu.Lock()
1032
1033 cancelled = true
1034 cc.cond.Broadcast()
1035 cc.mu.Unlock()
1036 return ctx.Err()
1037 }
1038 }
1039
1040 func (cc *ClientConn) sendGoAway() error {
1041 cc.mu.Lock()
1042 closing := cc.closing
1043 cc.closing = true
1044 maxStreamID := cc.nextStreamID
1045 cc.mu.Unlock()
1046 if closing {
1047
1048 return nil
1049 }
1050
1051 cc.wmu.Lock()
1052 defer cc.wmu.Unlock()
1053
1054 if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
1055 return err
1056 }
1057 if err := cc.bw.Flush(); err != nil {
1058 return err
1059 }
1060
1061 return nil
1062 }
1063
1064
1065
1066 func (cc *ClientConn) closeForError(err error) {
1067 cc.mu.Lock()
1068 cc.closed = true
1069 for _, cs := range cc.streams {
1070 cs.abortStreamLocked(err)
1071 }
1072 cc.cond.Broadcast()
1073 cc.mu.Unlock()
1074 cc.closeConn()
1075 }
1076
1077
1078
1079
1080 func (cc *ClientConn) Close() error {
1081 cc.closeForError(errClientConnForceClosed)
1082 return nil
1083 }
1084
1085
1086 func (cc *ClientConn) closeForLostPing() {
1087 err := errors.New("http2: client connection lost")
1088 if f := cc.fr.countError; f != nil {
1089 f("conn_close_lost_ping")
1090 }
1091 cc.closeForError(err)
1092 }
1093
1094
1095
1096 var errRequestCanceled = internal.ErrRequestCanceled
1097
1098 func (cc *ClientConn) responseHeaderTimeout() time.Duration {
1099 if cc.t.t1 != nil {
1100 return cc.t.t1.ResponseHeaderTimeout()
1101 }
1102
1103
1104
1105
1106 return 0
1107 }
1108
1109
1110
1111
1112 func actualContentLength(req *ClientRequest) int64 {
1113 if req.Body == nil || req.Body == NoBody {
1114 return 0
1115 }
1116 if req.ContentLength != 0 {
1117 return req.ContentLength
1118 }
1119 return -1
1120 }
1121
1122 func (cc *ClientConn) decrStreamReservations() {
1123 cc.mu.Lock()
1124 defer cc.mu.Unlock()
1125 cc.decrStreamReservationsLocked()
1126 }
1127
1128 func (cc *ClientConn) decrStreamReservationsLocked() {
1129 if cc.streamsReserved > 0 {
1130 cc.streamsReserved--
1131 }
1132 }
1133
1134 func (cc *ClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
1135 return cc.roundTrip(req, nil)
1136 }
1137
1138 func (cc *ClientConn) roundTrip(req *ClientRequest, streamf func(*clientStream)) (*ClientResponse, error) {
1139 ctx := req.Context
1140 req.stream = clientStream{
1141 cc: cc,
1142 ctx: ctx,
1143 reqCancel: req.Cancel,
1144 isHead: req.Method == "HEAD",
1145 reqBody: req.Body,
1146 reqBodyContentLength: actualContentLength(req),
1147 trace: httptrace.ContextClientTrace(ctx),
1148 peerClosed: make(chan struct{}),
1149 abort: make(chan struct{}),
1150 respHeaderRecv: make(chan struct{}),
1151 donec: make(chan struct{}),
1152 resTrailer: req.ResTrailer,
1153 }
1154 cs := &req.stream
1155
1156 cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
1157
1158 go cs.doRequest(req, streamf)
1159
1160 waitDone := func() error {
1161 select {
1162 case <-cs.donec:
1163 return nil
1164 case <-ctx.Done():
1165 return ctx.Err()
1166 case <-cs.reqCancel:
1167 return errRequestCanceled
1168 }
1169 }
1170
1171 handleResponseHeaders := func() (*ClientResponse, error) {
1172 res := cs.res
1173 if res.StatusCode > 299 {
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183 cs.abortRequestBodyWrite()
1184 }
1185 res.TLS = cc.tlsState
1186 if res.Body == NoBody && actualContentLength(req) == 0 {
1187
1188
1189
1190 if err := waitDone(); err != nil {
1191 return nil, err
1192 }
1193 }
1194 return res, nil
1195 }
1196
1197 cancelRequest := func(cs *clientStream, err error) error {
1198 cs.cc.mu.Lock()
1199 bodyClosed := cs.reqBodyClosed
1200 cs.cc.mu.Unlock()
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214 if bodyClosed != nil {
1215 <-bodyClosed
1216 }
1217 return err
1218 }
1219
1220 for {
1221 select {
1222 case <-cs.respHeaderRecv:
1223 return handleResponseHeaders()
1224 case <-cs.abort:
1225 select {
1226 case <-cs.respHeaderRecv:
1227
1228
1229
1230
1231 return handleResponseHeaders()
1232 default:
1233 waitDone()
1234 return nil, cs.abortErr
1235 }
1236 case <-ctx.Done():
1237 err := ctx.Err()
1238 cs.abortStream(err)
1239 return nil, cancelRequest(cs, err)
1240 case <-cs.reqCancel:
1241 cs.abortStream(errRequestCanceled)
1242 return nil, cancelRequest(cs, errRequestCanceled)
1243 }
1244 }
1245 }
1246
1247
1248
1249
1250 func (cs *clientStream) doRequest(req *ClientRequest, streamf func(*clientStream)) {
1251 err := cs.writeRequest(req, streamf)
1252 cs.cleanupWriteRequest(err)
1253 }
1254
1255 var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
1256
1257
1258
1259
1260
1261
1262
1263
1264 func (cs *clientStream) writeRequest(req *ClientRequest, streamf func(*clientStream)) (err error) {
1265 cc := cs.cc
1266 ctx := cs.ctx
1267
1268
1269
1270 var isExtendedConnect bool
1271 if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
1272 isExtendedConnect = true
1273 }
1274
1275
1276
1277
1278 if cc.reqHeaderMu == nil {
1279 panic("RoundTrip on uninitialized ClientConn")
1280 }
1281 if isExtendedConnect {
1282 select {
1283 case <-cs.reqCancel:
1284 return errRequestCanceled
1285 case <-ctx.Done():
1286 return ctx.Err()
1287 case <-cc.seenSettingsChan:
1288 if !cc.extendedConnectAllowed {
1289 return errExtendedConnectNotSupported
1290 }
1291 }
1292 }
1293 select {
1294 case cc.reqHeaderMu <- struct{}{}:
1295 case <-cs.reqCancel:
1296 return errRequestCanceled
1297 case <-ctx.Done():
1298 return ctx.Err()
1299 }
1300
1301 cc.mu.Lock()
1302 if cc.idleTimer != nil {
1303 cc.idleTimer.Stop()
1304 }
1305 cc.decrStreamReservationsLocked()
1306 if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
1307 cc.mu.Unlock()
1308 <-cc.reqHeaderMu
1309 return err
1310 }
1311 cc.addStreamLocked(cs)
1312 if isConnectionCloseRequest(req) {
1313 cc.doNotReuse = true
1314 }
1315 cc.mu.Unlock()
1316
1317 if streamf != nil {
1318 streamf(cs)
1319 }
1320
1321 continueTimeout := cc.t.expectContinueTimeout()
1322 if continueTimeout != 0 {
1323 if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
1324 continueTimeout = 0
1325 } else {
1326 cs.on100 = make(chan struct{}, 1)
1327 }
1328 }
1329
1330
1331
1332
1333
1334 err = cs.encodeAndWriteHeaders(req)
1335 <-cc.reqHeaderMu
1336 if err != nil {
1337 return err
1338 }
1339
1340 hasBody := cs.reqBodyContentLength != 0
1341 if !hasBody {
1342 cs.sentEndStream = true
1343 } else {
1344 if continueTimeout != 0 {
1345 traceWait100Continue(cs.trace)
1346 timer := time.NewTimer(continueTimeout)
1347 select {
1348 case <-timer.C:
1349 err = nil
1350 case <-cs.on100:
1351 err = nil
1352 case <-cs.abort:
1353 err = cs.abortErr
1354 case <-ctx.Done():
1355 err = ctx.Err()
1356 case <-cs.reqCancel:
1357 err = errRequestCanceled
1358 }
1359 timer.Stop()
1360 if err != nil {
1361 traceWroteRequest(cs.trace, err)
1362 return err
1363 }
1364 }
1365
1366 if err = cs.writeRequestBody(req); err != nil {
1367 if err != errStopReqBodyWrite {
1368 traceWroteRequest(cs.trace, err)
1369 return err
1370 }
1371 } else {
1372 cs.sentEndStream = true
1373 }
1374 }
1375
1376 traceWroteRequest(cs.trace, err)
1377
1378 var respHeaderTimer <-chan time.Time
1379 var respHeaderRecv chan struct{}
1380 if d := cc.responseHeaderTimeout(); d != 0 {
1381 timer := time.NewTimer(d)
1382 defer timer.Stop()
1383 respHeaderTimer = timer.C
1384 respHeaderRecv = cs.respHeaderRecv
1385 }
1386
1387
1388
1389 for {
1390 select {
1391 case <-cs.peerClosed:
1392 return nil
1393 case <-respHeaderTimer:
1394 return errTimeout
1395 case <-respHeaderRecv:
1396 respHeaderRecv = nil
1397 respHeaderTimer = nil
1398 case <-cs.abort:
1399 return cs.abortErr
1400 case <-ctx.Done():
1401 return ctx.Err()
1402 case <-cs.reqCancel:
1403 return errRequestCanceled
1404 }
1405 }
1406 }
1407
1408 func (cs *clientStream) encodeAndWriteHeaders(req *ClientRequest) error {
1409 cc := cs.cc
1410 ctx := cs.ctx
1411
1412 cc.wmu.Lock()
1413 defer cc.wmu.Unlock()
1414
1415
1416 select {
1417 case <-cs.abort:
1418 return cs.abortErr
1419 case <-ctx.Done():
1420 return ctx.Err()
1421 case <-cs.reqCancel:
1422 return errRequestCanceled
1423 default:
1424 }
1425
1426
1427
1428
1429
1430
1431 cc.hbuf.Reset()
1432 res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
1433 cc.writeHeader(name, value)
1434 })
1435 if err != nil {
1436 return fmt.Errorf("http2: %w", err)
1437 }
1438 hdrs := cc.hbuf.Bytes()
1439
1440
1441 endStream := !res.HasBody && !res.HasTrailers
1442 cs.sentHeaders = true
1443 err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
1444 traceWroteHeaders(cs.trace)
1445 return err
1446 }
1447
1448 func encodeRequestHeaders(req *ClientRequest, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
1449 return httpcommon.EncodeHeaders(req.Context, httpcommon.EncodeHeadersParam{
1450 Request: httpcommon.Request{
1451 Header: req.Header,
1452 Trailer: req.Trailer,
1453 URL: req.URL,
1454 Host: req.Host,
1455 Method: req.Method,
1456 ActualContentLength: actualContentLength(req),
1457 },
1458 AddGzipHeader: addGzipHeader,
1459 PeerMaxHeaderListSize: peerMaxHeaderListSize,
1460 DefaultUserAgent: defaultUserAgent,
1461 }, headerf)
1462 }
1463
1464
1465
1466
1467
1468 func (cs *clientStream) cleanupWriteRequest(err error) {
1469 cc := cs.cc
1470
1471 if cs.ID == 0 {
1472
1473 cc.decrStreamReservations()
1474 }
1475
1476
1477
1478
1479
1480 cc.mu.Lock()
1481 mustCloseBody := false
1482 if cs.reqBody != nil && cs.reqBodyClosed == nil {
1483 mustCloseBody = true
1484 cs.reqBodyClosed = make(chan struct{})
1485 }
1486 bodyClosed := cs.reqBodyClosed
1487 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1488
1489 readSinceStream := cc.readBeforeStreamID > cs.ID
1490 cc.mu.Unlock()
1491 if mustCloseBody {
1492 cs.reqBody.Close()
1493 close(bodyClosed)
1494 }
1495 if bodyClosed != nil {
1496 <-bodyClosed
1497 }
1498
1499 if err != nil && cs.sentEndStream {
1500
1501
1502
1503 select {
1504 case <-cs.peerClosed:
1505 err = nil
1506 default:
1507 }
1508 }
1509 if err != nil {
1510 cs.abortStream(err)
1511 if cs.sentHeaders {
1512 if se, ok := err.(StreamError); ok {
1513 if se.Cause != errFromPeer {
1514 cc.writeStreamReset(cs.ID, se.Code, false, err)
1515 }
1516 } else {
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534 ping := false
1535 if !closeOnIdle && !readSinceStream {
1536 cc.mu.Lock()
1537
1538
1539 if !cc.rstStreamPingsBlocked {
1540 if cc.pendingResets == 0 {
1541 ping = true
1542 }
1543 cc.pendingResets++
1544 }
1545 cc.mu.Unlock()
1546 }
1547 cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
1548 }
1549 }
1550 cs.bufPipe.CloseWithError(err)
1551 } else {
1552 if cs.sentHeaders && !cs.sentEndStream {
1553 cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
1554 }
1555 cs.bufPipe.CloseWithError(errRequestCanceled)
1556 }
1557 if cs.ID != 0 {
1558 cc.forgetStreamID(cs.ID)
1559 }
1560
1561 cc.wmu.Lock()
1562 werr := cc.werr
1563 cc.wmu.Unlock()
1564 if werr != nil {
1565 cc.Close()
1566 }
1567
1568 close(cs.donec)
1569 cc.maybeCallStateHook()
1570 }
1571
1572
1573
1574 func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
1575 for {
1576 if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
1577
1578
1579 return errClientConnNotEstablished
1580 }
1581 cc.lastActive = time.Now()
1582 if cc.closed || !cc.canTakeNewRequestLocked() {
1583 return errClientConnUnusable
1584 }
1585 cc.lastIdle = time.Time{}
1586 if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
1587 return nil
1588 }
1589 cc.pendingRequests++
1590 cc.cond.Wait()
1591 cc.pendingRequests--
1592 select {
1593 case <-cs.abort:
1594 return cs.abortErr
1595 default:
1596 }
1597 }
1598 }
1599
1600
1601 func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
1602 first := true
1603 for len(hdrs) > 0 && cc.werr == nil {
1604 chunk := hdrs
1605 if len(chunk) > maxFrameSize {
1606 chunk = chunk[:maxFrameSize]
1607 }
1608 hdrs = hdrs[len(chunk):]
1609 endHeaders := len(hdrs) == 0
1610 if first {
1611 cc.fr.WriteHeaders(HeadersFrameParam{
1612 StreamID: streamID,
1613 BlockFragment: chunk,
1614 EndStream: endStream,
1615 EndHeaders: endHeaders,
1616 })
1617 first = false
1618 } else {
1619 cc.fr.WriteContinuation(streamID, endHeaders, chunk)
1620 }
1621 }
1622 cc.bw.Flush()
1623 return cc.werr
1624 }
1625
1626
1627 var (
1628
1629 errStopReqBodyWrite = errors.New("http2: aborting request body write")
1630
1631
1632 errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
1633
1634 errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
1635 )
1636
1637
1638
1639
1640
1641
1642 func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
1643 const max = 512 << 10
1644 n := min(int64(maxFrameSize), max)
1645 if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
1646
1647
1648
1649
1650 n = cl + 1
1651 }
1652 if n < 1 {
1653 return 1
1654 }
1655 return int(n)
1656 }
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666 var bufPools [7]sync.Pool
1667 func bufPoolIndex(size int) int {
1668 if size <= 16384 {
1669 return 0
1670 }
1671 size -= 1
1672 bits := bits.Len(uint(size))
1673 index := bits - 14
1674 if index >= len(bufPools) {
1675 return len(bufPools) - 1
1676 }
1677 return index
1678 }
1679
1680 func (cs *clientStream) writeRequestBody(req *ClientRequest) (err error) {
1681 cc := cs.cc
1682 body := cs.reqBody
1683 sentEnd := false
1684
1685 hasTrailers := req.Trailer != nil
1686 remainLen := cs.reqBodyContentLength
1687 hasContentLen := remainLen != -1
1688
1689 cc.mu.Lock()
1690 maxFrameSize := int(cc.maxFrameSize)
1691 cc.mu.Unlock()
1692
1693
1694 scratchLen := cs.frameScratchBufferLen(maxFrameSize)
1695 var buf []byte
1696 index := bufPoolIndex(scratchLen)
1697 if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
1698 defer bufPools[index].Put(bp)
1699 buf = *bp
1700 } else {
1701 buf = make([]byte, scratchLen)
1702 defer bufPools[index].Put(&buf)
1703 }
1704
1705 var sawEOF bool
1706 for !sawEOF {
1707 n, err := body.Read(buf)
1708 if hasContentLen {
1709 remainLen -= int64(n)
1710 if remainLen == 0 && err == nil {
1711
1712
1713
1714
1715
1716
1717
1718 var scratch [1]byte
1719 var n1 int
1720 n1, err = body.Read(scratch[:])
1721 remainLen -= int64(n1)
1722 }
1723 if remainLen < 0 {
1724 err = errReqBodyTooLong
1725 return err
1726 }
1727 }
1728 if err != nil {
1729 cc.mu.Lock()
1730 bodyClosed := cs.reqBodyClosed != nil
1731 cc.mu.Unlock()
1732 switch {
1733 case bodyClosed:
1734 return errStopReqBodyWrite
1735 case err == io.EOF:
1736 sawEOF = true
1737 err = nil
1738 default:
1739 return err
1740 }
1741 }
1742
1743 remain := buf[:n]
1744 for len(remain) > 0 && err == nil {
1745 var allowed int32
1746 allowed, err = cs.awaitFlowControl(len(remain))
1747 if err != nil {
1748 return err
1749 }
1750 cc.wmu.Lock()
1751 data := remain[:allowed]
1752 remain = remain[allowed:]
1753 sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
1754 err = cc.fr.WriteData(cs.ID, sentEnd, data)
1755 if err == nil {
1756
1757
1758
1759
1760
1761
1762 err = cc.bw.Flush()
1763 }
1764 cc.wmu.Unlock()
1765 }
1766 if err != nil {
1767 return err
1768 }
1769 }
1770
1771 if sentEnd {
1772
1773
1774
1775 return nil
1776 }
1777
1778
1779
1780
1781 cc.mu.Lock()
1782 trailer := req.Trailer
1783 err = cs.abortErr
1784 cc.mu.Unlock()
1785 if err != nil {
1786 return err
1787 }
1788
1789 cc.wmu.Lock()
1790 defer cc.wmu.Unlock()
1791 var trls []byte
1792 if len(trailer) > 0 {
1793 trls, err = cc.encodeTrailers(trailer)
1794 if err != nil {
1795 return err
1796 }
1797 }
1798
1799
1800
1801 if len(trls) > 0 {
1802 err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
1803 } else {
1804 err = cc.fr.WriteData(cs.ID, true, nil)
1805 }
1806 if ferr := cc.bw.Flush(); ferr != nil && err == nil {
1807 err = ferr
1808 }
1809 return err
1810 }
1811
1812
1813
1814
1815
1816 func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
1817 cc := cs.cc
1818 ctx := cs.ctx
1819 cc.mu.Lock()
1820 defer cc.mu.Unlock()
1821 for {
1822 if cc.closed {
1823 return 0, errClientConnClosed
1824 }
1825 if cs.reqBodyClosed != nil {
1826 return 0, errStopReqBodyWrite
1827 }
1828 select {
1829 case <-cs.abort:
1830 return 0, cs.abortErr
1831 case <-ctx.Done():
1832 return 0, ctx.Err()
1833 case <-cs.reqCancel:
1834 return 0, errRequestCanceled
1835 default:
1836 }
1837 if a := cs.flow.available(); a > 0 {
1838 take := a
1839 if int(take) > maxBytes {
1840
1841 take = int32(maxBytes)
1842 }
1843 if take > int32(cc.maxFrameSize) {
1844 take = int32(cc.maxFrameSize)
1845 }
1846 cs.flow.take(take)
1847 return take, nil
1848 }
1849 cc.cond.Wait()
1850 }
1851 }
1852
1853
1854 func (cc *ClientConn) encodeTrailers(trailer Header) ([]byte, error) {
1855 cc.hbuf.Reset()
1856
1857 hlSize := uint64(0)
1858 for k, vv := range trailer {
1859 for _, v := range vv {
1860 hf := hpack.HeaderField{Name: k, Value: v}
1861 hlSize += uint64(hf.Size())
1862 }
1863 }
1864 if hlSize > cc.peerMaxHeaderListSize {
1865 return nil, errRequestHeaderListSize
1866 }
1867
1868 for k, vv := range trailer {
1869 lowKey, ascii := httpcommon.LowerHeader(k)
1870 if !ascii {
1871
1872
1873 continue
1874 }
1875
1876
1877 for _, v := range vv {
1878 cc.writeHeader(lowKey, v)
1879 }
1880 }
1881 return cc.hbuf.Bytes(), nil
1882 }
1883
1884 func (cc *ClientConn) writeHeader(name, value string) {
1885 if VerboseLogs {
1886 log.Printf("http2: Transport encoding header %q = %q", name, value)
1887 }
1888 cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
1889 }
1890
1891 type resAndError struct {
1892 _ incomparable
1893 res *ClientResponse
1894 err error
1895 }
1896
1897
1898 func (cc *ClientConn) addStreamLocked(cs *clientStream) {
1899 cs.flow.add(int32(cc.initialWindowSize))
1900 cs.flow.setConnFlow(&cc.flow)
1901 cs.inflow.init(cc.initialStreamRecvWindowSize)
1902 cs.ID = cc.nextStreamID
1903 cc.nextStreamID += 2
1904 cc.streams[cs.ID] = cs
1905 if cs.ID == 0 {
1906 panic("assigned stream ID 0")
1907 }
1908 }
1909
1910 func (cc *ClientConn) forgetStreamID(id uint32) {
1911 cc.mu.Lock()
1912 slen := len(cc.streams)
1913 delete(cc.streams, id)
1914 if len(cc.streams) != slen-1 {
1915 panic("forgetting unknown stream id")
1916 }
1917 cc.lastActive = time.Now()
1918 if len(cc.streams) == 0 && cc.idleTimer != nil {
1919 cc.idleTimer.Reset(cc.idleTimeout)
1920 cc.lastIdle = time.Now()
1921 }
1922
1923
1924 cc.cond.Broadcast()
1925
1926 closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
1927 if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
1928 if VerboseLogs {
1929 cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
1930 }
1931 cc.closed = true
1932 defer cc.closeConn()
1933 }
1934
1935 cc.mu.Unlock()
1936 }
1937
1938
1939 type clientConnReadLoop struct {
1940 _ incomparable
1941 cc *ClientConn
1942 }
1943
1944
1945 func (cc *ClientConn) readLoop() {
1946 rl := &clientConnReadLoop{cc: cc}
1947 defer rl.cleanup()
1948 cc.readerErr = rl.run()
1949 if ce, ok := cc.readerErr.(ConnectionError); ok {
1950 cc.wmu.Lock()
1951 cc.fr.WriteGoAway(0, ErrCode(ce), nil)
1952 cc.wmu.Unlock()
1953 }
1954 }
1955
1956
1957
1958 type GoAwayError struct {
1959 LastStreamID uint32
1960 ErrCode ErrCode
1961 DebugData string
1962 }
1963
1964 func (e GoAwayError) Error() string {
1965 return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
1966 e.LastStreamID, e.ErrCode, e.DebugData)
1967 }
1968
1969 func isEOFOrNetReadError(err error) bool {
1970 if err == io.EOF {
1971 return true
1972 }
1973 ne, ok := err.(*net.OpError)
1974 return ok && ne.Op == "read"
1975 }
1976
1977 func (rl *clientConnReadLoop) cleanup() {
1978 cc := rl.cc
1979 defer cc.closeConn()
1980 defer close(cc.readerDone)
1981
1982 if cc.idleTimer != nil {
1983 cc.idleTimer.Stop()
1984 }
1985
1986
1987
1988
1989 err := cc.readerErr
1990 cc.mu.Lock()
1991 if cc.goAway != nil && isEOFOrNetReadError(err) {
1992 err = GoAwayError{
1993 LastStreamID: cc.goAway.LastStreamID,
1994 ErrCode: cc.goAway.ErrCode,
1995 DebugData: cc.goAwayDebug,
1996 }
1997 } else if err == io.EOF {
1998 err = io.ErrUnexpectedEOF
1999 }
2000 cc.closed = true
2001
2002
2003
2004
2005
2006
2007
2008 unusedWaitTime := 5 * time.Second
2009 if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
2010 unusedWaitTime = cc.idleTimeout
2011 }
2012 idleTime := time.Now().Sub(cc.lastActive)
2013 if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
2014 cc.idleTimer = time.AfterFunc(unusedWaitTime-idleTime, func() {
2015 cc.t.connPool.MarkDead(cc)
2016 })
2017 } else {
2018 cc.mu.Unlock()
2019 cc.t.connPool.MarkDead(cc)
2020 cc.mu.Lock()
2021 }
2022
2023 for _, cs := range cc.streams {
2024 select {
2025 case <-cs.peerClosed:
2026
2027
2028 default:
2029 cs.abortStreamLocked(err)
2030 }
2031 }
2032 cc.cond.Broadcast()
2033 cc.mu.Unlock()
2034
2035 if !cc.seenSettings {
2036
2037
2038 cc.extendedConnectAllowed = true
2039 close(cc.seenSettingsChan)
2040 }
2041 }
2042
2043
2044
2045 func (cc *ClientConn) countReadFrameError(err error) {
2046 f := cc.fr.countError
2047 if f == nil || err == nil {
2048 return
2049 }
2050 if ce, ok := err.(ConnectionError); ok {
2051 errCode := ErrCode(ce)
2052 f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
2053 return
2054 }
2055 if errors.Is(err, io.EOF) {
2056 f("read_frame_eof")
2057 return
2058 }
2059 if errors.Is(err, io.ErrUnexpectedEOF) {
2060 f("read_frame_unexpected_eof")
2061 return
2062 }
2063 if errors.Is(err, ErrFrameTooLarge) {
2064 f("read_frame_too_large")
2065 return
2066 }
2067 f("read_frame_other")
2068 }
2069
2070 func (rl *clientConnReadLoop) run() error {
2071 cc := rl.cc
2072 gotSettings := false
2073 readIdleTimeout := cc.readIdleTimeout
2074 var t *time.Timer
2075 if readIdleTimeout != 0 {
2076 t = time.AfterFunc(readIdleTimeout, cc.healthCheck)
2077 }
2078 for {
2079 f, err := cc.fr.ReadFrame()
2080 if t != nil {
2081 t.Reset(readIdleTimeout)
2082 }
2083 if err != nil {
2084 cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
2085 }
2086 if se, ok := err.(StreamError); ok {
2087 if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
2088 if se.Cause == nil {
2089 se.Cause = cc.fr.errDetail
2090 }
2091 rl.endStreamError(cs, se)
2092 }
2093 continue
2094 } else if err != nil {
2095 cc.countReadFrameError(err)
2096 return err
2097 }
2098 if VerboseLogs {
2099 cc.vlogf("http2: Transport received %s", summarizeFrame(f))
2100 }
2101 if !gotSettings {
2102 if _, ok := f.(*SettingsFrame); !ok {
2103 cc.logf("protocol error: received %T before a SETTINGS frame", f)
2104 return ConnectionError(ErrCodeProtocol)
2105 }
2106 gotSettings = true
2107 }
2108
2109 switch f := f.(type) {
2110 case *MetaHeadersFrame:
2111 err = rl.processHeaders(f)
2112 case *DataFrame:
2113 err = rl.processData(f)
2114 case *GoAwayFrame:
2115 err = rl.processGoAway(f)
2116 case *RSTStreamFrame:
2117 err = rl.processResetStream(f)
2118 case *SettingsFrame:
2119 err = rl.processSettings(f)
2120 case *PushPromiseFrame:
2121 err = rl.processPushPromise(f)
2122 case *WindowUpdateFrame:
2123 err = rl.processWindowUpdate(f)
2124 case *PingFrame:
2125 err = rl.processPing(f)
2126 default:
2127 cc.logf("Transport: unhandled response frame type %T", f)
2128 }
2129 if err != nil {
2130 if VerboseLogs {
2131 cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
2132 }
2133 return err
2134 }
2135 }
2136 }
2137
2138 func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
2139 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2140 if cs == nil {
2141
2142
2143
2144 return nil
2145 }
2146 if cs.readClosed {
2147 rl.endStreamError(cs, StreamError{
2148 StreamID: f.StreamID,
2149 Code: ErrCodeProtocol,
2150 Cause: errors.New("protocol error: headers after END_STREAM"),
2151 })
2152 return nil
2153 }
2154 if !cs.firstByte {
2155 if cs.trace != nil {
2156
2157
2158
2159
2160 traceFirstResponseByte(cs.trace)
2161 }
2162 cs.firstByte = true
2163 }
2164 if !cs.pastHeaders {
2165 cs.pastHeaders = true
2166 } else {
2167 return rl.processTrailers(cs, f)
2168 }
2169
2170 res, err := rl.handleResponse(cs, f)
2171 if err != nil {
2172 if _, ok := err.(ConnectionError); ok {
2173 return err
2174 }
2175
2176 rl.endStreamError(cs, StreamError{
2177 StreamID: f.StreamID,
2178 Code: ErrCodeProtocol,
2179 Cause: err,
2180 })
2181 return nil
2182 }
2183 if res == nil {
2184
2185 return nil
2186 }
2187 cs.res = res
2188 close(cs.respHeaderRecv)
2189 if f.StreamEnded() {
2190 rl.endStream(cs)
2191 }
2192 return nil
2193 }
2194
2195
2196
2197
2198
2199
2200
2201 func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*ClientResponse, error) {
2202 if f.Truncated {
2203 return nil, errResponseHeaderListSize
2204 }
2205
2206 status := f.PseudoValue("status")
2207 if status == "" {
2208 return nil, errors.New("malformed response from server: missing status pseudo header")
2209 }
2210 statusCode, err := strconv.Atoi(status)
2211 if err != nil {
2212 return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
2213 }
2214
2215 regularFields := f.RegularFields()
2216 strs := make([]string, len(regularFields))
2217 header := make(Header, len(regularFields))
2218 res := &cs.staticResp
2219 cs.staticResp = ClientResponse{
2220 Header: header,
2221 StatusCode: statusCode,
2222 Status: status,
2223 }
2224 for _, hf := range regularFields {
2225 key := httpcommon.CanonicalHeader(hf.Name)
2226 if key == "Trailer" {
2227 t := res.Trailer
2228 if t == nil {
2229 t = make(Header)
2230 res.Trailer = t
2231 }
2232 foreachHeaderElement(hf.Value, func(v string) {
2233 t[httpcommon.CanonicalHeader(v)] = nil
2234 })
2235 } else {
2236 vv := header[key]
2237 if vv == nil && len(strs) > 0 {
2238
2239
2240
2241
2242 vv, strs = strs[:1:1], strs[1:]
2243 vv[0] = hf.Value
2244 header[key] = vv
2245 } else {
2246 header[key] = append(vv, hf.Value)
2247 }
2248 }
2249 }
2250
2251 if statusCode >= 100 && statusCode <= 199 {
2252 if f.StreamEnded() {
2253 return nil, errors.New("1xx informational response with END_STREAM flag")
2254 }
2255 if fn := cs.get1xxTraceFunc(); fn != nil {
2256
2257
2258
2259 if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
2260 return nil, err
2261 }
2262 } else {
2263
2264
2265
2266
2267
2268
2269
2270 limit := int64(cs.cc.t.maxHeaderListSize())
2271 if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes() > limit {
2272 limit = t1.MaxResponseHeaderBytes()
2273 }
2274 for _, h := range f.Fields {
2275 cs.totalHeaderSize += int64(h.Size())
2276 }
2277 if cs.totalHeaderSize > limit {
2278 if VerboseLogs {
2279 log.Printf("http2: 1xx informational responses too large")
2280 }
2281 return nil, errors.New("header list too large")
2282 }
2283 }
2284 if statusCode == 100 {
2285 traceGot100Continue(cs.trace)
2286 select {
2287 case cs.on100 <- struct{}{}:
2288 default:
2289 }
2290 }
2291 cs.pastHeaders = false
2292 return nil, nil
2293 }
2294
2295 res.ContentLength = -1
2296 if clens := res.Header["Content-Length"]; len(clens) == 1 {
2297 if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
2298 res.ContentLength = int64(cl)
2299 } else {
2300
2301
2302 }
2303 } else if len(clens) > 1 {
2304
2305
2306 } else if f.StreamEnded() && !cs.isHead {
2307 res.ContentLength = 0
2308 }
2309
2310 if cs.isHead {
2311 res.Body = NoBody
2312 return res, nil
2313 }
2314
2315 if f.StreamEnded() {
2316 if res.ContentLength > 0 {
2317 res.Body = missingBody{}
2318 } else {
2319 res.Body = NoBody
2320 }
2321 return res, nil
2322 }
2323
2324 cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
2325 cs.bytesRemain = res.ContentLength
2326 res.Body = transportResponseBody{cs}
2327
2328 if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
2329 res.Header.Del("Content-Encoding")
2330 res.Header.Del("Content-Length")
2331 res.ContentLength = -1
2332 res.Body = &gzipReader{body: res.Body}
2333 res.Uncompressed = true
2334 }
2335 return res, nil
2336 }
2337
2338 func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
2339 if cs.pastTrailers {
2340
2341 return ConnectionError(ErrCodeProtocol)
2342 }
2343 cs.pastTrailers = true
2344 if !f.StreamEnded() {
2345
2346
2347 return ConnectionError(ErrCodeProtocol)
2348 }
2349 if len(f.PseudoFields()) > 0 {
2350
2351
2352 return ConnectionError(ErrCodeProtocol)
2353 }
2354
2355 trailer := make(Header)
2356 for _, hf := range f.RegularFields() {
2357 key := httpcommon.CanonicalHeader(hf.Name)
2358 trailer[key] = append(trailer[key], hf.Value)
2359 }
2360 cs.trailer = trailer
2361
2362 rl.endStream(cs)
2363 return nil
2364 }
2365
2366
2367
2368 type transportResponseBody struct {
2369 cs *clientStream
2370 }
2371
2372 func (b transportResponseBody) Read(p []byte) (n int, err error) {
2373 cs := b.cs
2374 cc := cs.cc
2375
2376 if cs.readErr != nil {
2377 return 0, cs.readErr
2378 }
2379 n, err = b.cs.bufPipe.Read(p)
2380 if cs.bytesRemain != -1 {
2381 if int64(n) > cs.bytesRemain {
2382 n = int(cs.bytesRemain)
2383 if err == nil {
2384 err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
2385 cs.abortStream(err)
2386 }
2387 cs.readErr = err
2388 return int(cs.bytesRemain), err
2389 }
2390 cs.bytesRemain -= int64(n)
2391 if err == io.EOF && cs.bytesRemain > 0 {
2392 err = io.ErrUnexpectedEOF
2393 cs.readErr = err
2394 return n, err
2395 }
2396 }
2397 if n == 0 {
2398
2399 return
2400 }
2401
2402 cc.mu.Lock()
2403 connAdd := cc.inflow.add(n)
2404 var streamAdd int32
2405 if err == nil {
2406 streamAdd = cs.inflow.add(n)
2407 }
2408 cc.mu.Unlock()
2409
2410 if connAdd != 0 || streamAdd != 0 {
2411 cc.wmu.Lock()
2412 defer cc.wmu.Unlock()
2413 if connAdd != 0 {
2414 cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
2415 }
2416 if streamAdd != 0 {
2417 cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
2418 }
2419 cc.bw.Flush()
2420 }
2421 return
2422 }
2423
2424 var errClosedResponseBody = errors.New("http2: response body closed")
2425
2426 func (b transportResponseBody) Close() error {
2427 cs := b.cs
2428 cc := cs.cc
2429
2430 cs.bufPipe.BreakWithError(errClosedResponseBody)
2431 cs.abortStream(errClosedResponseBody)
2432
2433 unread := cs.bufPipe.Len()
2434 if unread > 0 {
2435 cc.mu.Lock()
2436
2437 connAdd := cc.inflow.add(unread)
2438 cc.mu.Unlock()
2439
2440
2441
2442 cc.wmu.Lock()
2443
2444 if connAdd > 0 {
2445 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2446 }
2447 cc.bw.Flush()
2448 cc.wmu.Unlock()
2449 }
2450
2451 select {
2452 case <-cs.donec:
2453 case <-cs.ctx.Done():
2454
2455
2456
2457 return nil
2458 case <-cs.reqCancel:
2459 return errRequestCanceled
2460 }
2461 return nil
2462 }
2463
2464 func (rl *clientConnReadLoop) processData(f *DataFrame) error {
2465 cc := rl.cc
2466 cs := rl.streamByID(f.StreamID, headerOrDataFrame)
2467 data := f.Data()
2468 if cs == nil {
2469 cc.mu.Lock()
2470 neverSent := cc.nextStreamID
2471 cc.mu.Unlock()
2472 if f.StreamID >= neverSent {
2473
2474 cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
2475 return ConnectionError(ErrCodeProtocol)
2476 }
2477
2478
2479
2480
2481
2482
2483 if f.Length > 0 {
2484 cc.mu.Lock()
2485 ok := cc.inflow.take(f.Length)
2486 connAdd := cc.inflow.add(int(f.Length))
2487 cc.mu.Unlock()
2488 if !ok {
2489 return ConnectionError(ErrCodeFlowControl)
2490 }
2491 if connAdd > 0 {
2492 cc.wmu.Lock()
2493 cc.fr.WriteWindowUpdate(0, uint32(connAdd))
2494 cc.bw.Flush()
2495 cc.wmu.Unlock()
2496 }
2497 }
2498 return nil
2499 }
2500 if cs.readClosed {
2501 cc.logf("protocol error: received DATA after END_STREAM")
2502 rl.endStreamError(cs, StreamError{
2503 StreamID: f.StreamID,
2504 Code: ErrCodeProtocol,
2505 })
2506 return nil
2507 }
2508 if !cs.pastHeaders {
2509 cc.logf("protocol error: received DATA before a HEADERS frame")
2510 rl.endStreamError(cs, StreamError{
2511 StreamID: f.StreamID,
2512 Code: ErrCodeProtocol,
2513 })
2514 return nil
2515 }
2516 if f.Length > 0 {
2517 if cs.isHead && len(data) > 0 {
2518 cc.logf("protocol error: received DATA on a HEAD request")
2519 rl.endStreamError(cs, StreamError{
2520 StreamID: f.StreamID,
2521 Code: ErrCodeProtocol,
2522 })
2523 return nil
2524 }
2525
2526 cc.mu.Lock()
2527 if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
2528 cc.mu.Unlock()
2529 return ConnectionError(ErrCodeFlowControl)
2530 }
2531
2532
2533 var refund int
2534 if pad := int(f.Length) - len(data); pad > 0 {
2535 refund += pad
2536 }
2537
2538 didReset := false
2539 var err error
2540 if len(data) > 0 {
2541 if _, err = cs.bufPipe.Write(data); err != nil {
2542
2543
2544 didReset = true
2545 refund += len(data)
2546 }
2547 }
2548
2549 sendConn := cc.inflow.add(refund)
2550 var sendStream int32
2551 if !didReset {
2552 sendStream = cs.inflow.add(refund)
2553 }
2554 cc.mu.Unlock()
2555
2556 if sendConn > 0 || sendStream > 0 {
2557 cc.wmu.Lock()
2558 if sendConn > 0 {
2559 cc.fr.WriteWindowUpdate(0, uint32(sendConn))
2560 }
2561 if sendStream > 0 {
2562 cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
2563 }
2564 cc.bw.Flush()
2565 cc.wmu.Unlock()
2566 }
2567
2568 if err != nil {
2569 rl.endStreamError(cs, err)
2570 return nil
2571 }
2572 }
2573
2574 if f.StreamEnded() {
2575 rl.endStream(cs)
2576 }
2577 return nil
2578 }
2579
2580 func (rl *clientConnReadLoop) endStream(cs *clientStream) {
2581
2582
2583 if !cs.readClosed {
2584 cs.readClosed = true
2585
2586
2587
2588
2589 rl.cc.mu.Lock()
2590 defer rl.cc.mu.Unlock()
2591 cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
2592 close(cs.peerClosed)
2593 }
2594 }
2595
2596 func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
2597 cs.readAborted = true
2598 cs.abortStream(err)
2599 }
2600
2601 func (rl *clientConnReadLoop) endStreamErrorLocked(cs *clientStream, err error) {
2602 cs.readAborted = true
2603 cs.abortStreamLocked(err)
2604 }
2605
2606
2607 const (
2608 headerOrDataFrame = true
2609 notHeaderOrDataFrame = false
2610 )
2611
2612
2613
2614 func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
2615 rl.cc.mu.Lock()
2616 defer rl.cc.mu.Unlock()
2617 if headerOrData {
2618
2619
2620 rl.cc.rstStreamPingsBlocked = false
2621 }
2622 rl.cc.readBeforeStreamID = rl.cc.nextStreamID
2623 cs := rl.cc.streams[id]
2624 if cs != nil && !cs.readAborted {
2625 return cs
2626 }
2627 return nil
2628 }
2629
2630 func (cs *clientStream) copyTrailers() {
2631 for k, vv := range cs.trailer {
2632 t := cs.resTrailer
2633 if *t == nil {
2634 *t = make(Header)
2635 }
2636 (*t)[k] = vv
2637 }
2638 }
2639
2640 func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
2641 cc := rl.cc
2642 cc.t.connPool.MarkDead(cc)
2643 if f.ErrCode != 0 {
2644
2645 cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
2646 if fn := cc.fr.countError; fn != nil {
2647 fn("recv_goaway_" + f.ErrCode.stringToken())
2648 }
2649 }
2650 cc.setGoAway(f)
2651 return nil
2652 }
2653
2654 func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
2655 cc := rl.cc
2656
2657
2658 cc.wmu.Lock()
2659 defer cc.wmu.Unlock()
2660
2661 if err := rl.processSettingsNoWrite(f); err != nil {
2662 return err
2663 }
2664 if !f.IsAck() {
2665 cc.fr.WriteSettingsAck()
2666 cc.bw.Flush()
2667 }
2668 return nil
2669 }
2670
2671 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
2672 cc := rl.cc
2673 defer cc.maybeCallStateHook()
2674 cc.mu.Lock()
2675 defer cc.mu.Unlock()
2676
2677 if f.IsAck() {
2678 if cc.wantSettingsAck {
2679 cc.wantSettingsAck = false
2680 return nil
2681 }
2682 return ConnectionError(ErrCodeProtocol)
2683 }
2684
2685 var seenMaxConcurrentStreams bool
2686 err := f.ForeachSetting(func(s Setting) error {
2687 if err := s.Valid(); err != nil {
2688 return err
2689 }
2690 switch s.ID {
2691 case SettingMaxFrameSize:
2692 cc.maxFrameSize = s.Val
2693 case SettingMaxConcurrentStreams:
2694 cc.maxConcurrentStreams = s.Val
2695 seenMaxConcurrentStreams = true
2696 case SettingMaxHeaderListSize:
2697 cc.peerMaxHeaderListSize = uint64(s.Val)
2698 case SettingInitialWindowSize:
2699
2700
2701
2702
2703 if s.Val > math.MaxInt32 {
2704 return ConnectionError(ErrCodeFlowControl)
2705 }
2706
2707
2708
2709
2710 delta := int32(s.Val) - int32(cc.initialWindowSize)
2711 for _, cs := range cc.streams {
2712 cs.flow.add(delta)
2713 }
2714 cc.cond.Broadcast()
2715
2716 cc.initialWindowSize = s.Val
2717 case SettingHeaderTableSize:
2718 cc.henc.SetMaxDynamicTableSize(s.Val)
2719 cc.peerMaxHeaderTableSize = s.Val
2720 case SettingEnableConnectProtocol:
2721
2722
2723
2724
2725
2726
2727
2728
2729 if !cc.seenSettings {
2730 cc.extendedConnectAllowed = s.Val == 1
2731 }
2732 default:
2733 cc.vlogf("Unhandled Setting: %v", s)
2734 }
2735 return nil
2736 })
2737 if err != nil {
2738 return err
2739 }
2740
2741 if !cc.seenSettings {
2742 if !seenMaxConcurrentStreams {
2743
2744
2745
2746
2747 cc.maxConcurrentStreams = defaultMaxConcurrentStreams
2748 }
2749 close(cc.seenSettingsChan)
2750 cc.seenSettings = true
2751 }
2752
2753 return nil
2754 }
2755
2756 func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
2757 cc := rl.cc
2758 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2759 if f.StreamID != 0 && cs == nil {
2760 return nil
2761 }
2762
2763 cc.mu.Lock()
2764 defer cc.mu.Unlock()
2765
2766 fl := &cc.flow
2767 if cs != nil {
2768 fl = &cs.flow
2769 }
2770 if !fl.add(int32(f.Increment)) {
2771
2772 if cs != nil {
2773 rl.endStreamErrorLocked(cs, StreamError{
2774 StreamID: f.StreamID,
2775 Code: ErrCodeFlowControl,
2776 })
2777 return nil
2778 }
2779
2780 return ConnectionError(ErrCodeFlowControl)
2781 }
2782 cc.cond.Broadcast()
2783 return nil
2784 }
2785
2786 func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
2787 cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
2788 if cs == nil {
2789
2790 return nil
2791 }
2792 serr := streamError(cs.ID, f.ErrCode)
2793 serr.Cause = errFromPeer
2794 if f.ErrCode == ErrCodeProtocol {
2795 rl.cc.SetDoNotReuse()
2796 }
2797 if fn := cs.cc.fr.countError; fn != nil {
2798 fn("recv_rststream_" + f.ErrCode.stringToken())
2799 }
2800 cs.abortStream(serr)
2801
2802 cs.bufPipe.CloseWithError(serr)
2803 return nil
2804 }
2805
2806
2807 func (cc *ClientConn) Ping(ctx context.Context) error {
2808 c := make(chan struct{})
2809
2810 var p [8]byte
2811 for {
2812 if _, err := rand.Read(p[:]); err != nil {
2813 return err
2814 }
2815 cc.mu.Lock()
2816
2817 if _, found := cc.pings[p]; !found {
2818 cc.pings[p] = c
2819 cc.mu.Unlock()
2820 break
2821 }
2822 cc.mu.Unlock()
2823 }
2824 var pingError error
2825 errc := make(chan struct{})
2826 go func() {
2827 cc.wmu.Lock()
2828 defer cc.wmu.Unlock()
2829 if pingError = cc.fr.WritePing(false, p); pingError != nil {
2830 close(errc)
2831 return
2832 }
2833 if pingError = cc.bw.Flush(); pingError != nil {
2834 close(errc)
2835 return
2836 }
2837 }()
2838 select {
2839 case <-c:
2840 return nil
2841 case <-errc:
2842 return pingError
2843 case <-ctx.Done():
2844 return ctx.Err()
2845 case <-cc.readerDone:
2846
2847 return cc.readerErr
2848 }
2849 }
2850
2851 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
2852 if f.IsAck() {
2853 cc := rl.cc
2854 defer cc.maybeCallStateHook()
2855 cc.mu.Lock()
2856 defer cc.mu.Unlock()
2857
2858 if c, ok := cc.pings[f.Data]; ok {
2859 close(c)
2860 delete(cc.pings, f.Data)
2861 }
2862 if cc.pendingResets > 0 {
2863
2864 cc.pendingResets = 0
2865 cc.rstStreamPingsBlocked = true
2866 cc.cond.Broadcast()
2867 }
2868 return nil
2869 }
2870 cc := rl.cc
2871 cc.wmu.Lock()
2872 defer cc.wmu.Unlock()
2873 if err := cc.fr.WritePing(true, f.Data); err != nil {
2874 return err
2875 }
2876 return cc.bw.Flush()
2877 }
2878
2879 func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
2880
2881
2882
2883
2884
2885
2886
2887 return ConnectionError(ErrCodeProtocol)
2888 }
2889
2890
2891
2892 func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
2893
2894
2895
2896
2897 cc.wmu.Lock()
2898 cc.fr.WriteRSTStream(streamID, code)
2899 if ping {
2900 var payload [8]byte
2901 rand.Read(payload[:])
2902 cc.fr.WritePing(false, payload)
2903 }
2904 cc.bw.Flush()
2905 cc.wmu.Unlock()
2906 }
2907
2908 var (
2909 errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
2910 errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
2911 )
2912
2913 func (cc *ClientConn) logf(format string, args ...any) {
2914 cc.t.logf(format, args...)
2915 }
2916
2917 func (cc *ClientConn) vlogf(format string, args ...any) {
2918 cc.t.vlogf(format, args...)
2919 }
2920
2921 func (t *Transport) vlogf(format string, args ...any) {
2922 if VerboseLogs {
2923 t.logf(format, args...)
2924 }
2925 }
2926
2927 func (t *Transport) logf(format string, args ...any) {
2928 log.Printf(format, args...)
2929 }
2930
2931 type missingBody struct{}
2932
2933 func (missingBody) Close() error { return nil }
2934 func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
2935
2936 type erringRoundTripper struct{ err error }
2937
2938 func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
2939 func (rt erringRoundTripper) RoundTrip(*ClientRequest) (*ClientResponse, error) { return nil, rt.err }
2940
2941 var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body")
2942
2943
2944
2945
2946
2947 type gzipReader struct {
2948 _ incomparable
2949 body io.ReadCloser
2950 mu sync.Mutex
2951 zr *gzip.Reader
2952 zerr error
2953 }
2954
2955 type eofReader struct{}
2956
2957 func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
2958 func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
2959
2960 var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
2961
2962
2963 func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
2964 zr := gzipPool.Get().(*gzip.Reader)
2965 if err := zr.Reset(r); err != nil {
2966 gzipPoolPut(zr)
2967 return nil, err
2968 }
2969 return zr, nil
2970 }
2971
2972
2973 func gzipPoolPut(zr *gzip.Reader) {
2974
2975
2976 var r flate.Reader = eofReader{}
2977 zr.Reset(r)
2978 gzipPool.Put(zr)
2979 }
2980
2981
2982
2983 func (gz *gzipReader) acquire() (*gzip.Reader, error) {
2984 gz.mu.Lock()
2985 defer gz.mu.Unlock()
2986 if gz.zerr != nil {
2987 return nil, gz.zerr
2988 }
2989 if gz.zr == nil {
2990 gz.zr, gz.zerr = gzipPoolGet(gz.body)
2991 if gz.zerr != nil {
2992 return nil, gz.zerr
2993 }
2994 }
2995 ret := gz.zr
2996 gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
2997 return ret, nil
2998 }
2999
3000
3001 func (gz *gzipReader) release(zr *gzip.Reader) {
3002 gz.mu.Lock()
3003 defer gz.mu.Unlock()
3004 if gz.zerr == errConcurrentReadOnResBody {
3005 gz.zr, gz.zerr = zr, nil
3006 } else {
3007 gzipPoolPut(zr)
3008 }
3009 }
3010
3011
3012
3013 func (gz *gzipReader) close() {
3014 gz.mu.Lock()
3015 defer gz.mu.Unlock()
3016 if gz.zerr == nil && gz.zr != nil {
3017 gzipPoolPut(gz.zr)
3018 gz.zr = nil
3019 }
3020 gz.zerr = fs.ErrClosed
3021 }
3022
3023 func (gz *gzipReader) Read(p []byte) (n int, err error) {
3024 zr, err := gz.acquire()
3025 if err != nil {
3026 return 0, err
3027 }
3028 defer gz.release(zr)
3029
3030 return zr.Read(p)
3031 }
3032
3033 func (gz *gzipReader) Close() error {
3034 gz.close()
3035
3036 return gz.body.Close()
3037 }
3038
3039
3040
3041 func isConnectionCloseRequest(req *ClientRequest) bool {
3042 return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
3043 }
3044
3045
3046
3047 type NetHTTPClientConn struct {
3048 cc *ClientConn
3049 }
3050
3051 func (cc NetHTTPClientConn) RoundTrip(req *ClientRequest) (*ClientResponse, error) {
3052 return cc.cc.RoundTrip(req)
3053 }
3054
3055 func (cc NetHTTPClientConn) Close() error {
3056 return cc.cc.Close()
3057 }
3058
3059 func (cc NetHTTPClientConn) Err() error {
3060 cc.cc.mu.Lock()
3061 defer cc.cc.mu.Unlock()
3062 if cc.cc.closed {
3063 return errors.New("connection closed")
3064 }
3065 return nil
3066 }
3067
3068 func (cc NetHTTPClientConn) Reserve() error {
3069 defer cc.cc.maybeCallStateHook()
3070 cc.cc.mu.Lock()
3071 defer cc.cc.mu.Unlock()
3072 if !cc.cc.canReserveLocked() {
3073 return errors.New("connection is unavailable")
3074 }
3075 cc.cc.streamsReserved++
3076 return nil
3077 }
3078
3079 func (cc NetHTTPClientConn) Release() {
3080 defer cc.cc.maybeCallStateHook()
3081 cc.cc.mu.Lock()
3082 defer cc.cc.mu.Unlock()
3083
3084
3085
3086
3087 if cc.cc.streamsReserved > 0 {
3088 cc.cc.streamsReserved--
3089 }
3090 }
3091
3092 func (cc NetHTTPClientConn) Available() int {
3093 cc.cc.mu.Lock()
3094 defer cc.cc.mu.Unlock()
3095 return cc.cc.availableLocked()
3096 }
3097
3098 func (cc NetHTTPClientConn) InFlight() int {
3099 cc.cc.mu.Lock()
3100 defer cc.cc.mu.Unlock()
3101 return cc.cc.currentRequestCountLocked()
3102 }
3103
3104 func (cc NetHTTPClientConn) Ping(ctx context.Context) error {
3105 return cc.cc.Ping(ctx)
3106 }
3107
3108 func (cc *ClientConn) maybeCallStateHook() {
3109 if cc.internalStateHook != nil {
3110 cc.internalStateHook()
3111 }
3112 }
3113
3114 func (t *Transport) idleConnTimeout() time.Duration {
3115 if t.t1 != nil {
3116 return t.t1.IdleConnTimeout()
3117 }
3118
3119 return 0
3120 }
3121
3122 func traceGetConn(req *ClientRequest, hostPort string) {
3123 trace := httptrace.ContextClientTrace(req.Context)
3124 if trace == nil || trace.GetConn == nil {
3125 return
3126 }
3127 trace.GetConn(hostPort)
3128 }
3129
3130 func traceGotConn(req *ClientRequest, cc *ClientConn, reused bool) {
3131 trace := httptrace.ContextClientTrace(req.Context)
3132 if trace == nil || trace.GotConn == nil {
3133 return
3134 }
3135 ci := httptrace.GotConnInfo{Conn: cc.tconn}
3136 ci.Reused = reused
3137 cc.mu.Lock()
3138 ci.WasIdle = len(cc.streams) == 0 && reused
3139 if ci.WasIdle && !cc.lastActive.IsZero() {
3140 ci.IdleTime = time.Since(cc.lastActive)
3141 }
3142 cc.mu.Unlock()
3143
3144 trace.GotConn(ci)
3145 }
3146
3147 func traceWroteHeaders(trace *httptrace.ClientTrace) {
3148 if trace != nil && trace.WroteHeaders != nil {
3149 trace.WroteHeaders()
3150 }
3151 }
3152
3153 func traceGot100Continue(trace *httptrace.ClientTrace) {
3154 if trace != nil && trace.Got100Continue != nil {
3155 trace.Got100Continue()
3156 }
3157 }
3158
3159 func traceWait100Continue(trace *httptrace.ClientTrace) {
3160 if trace != nil && trace.Wait100Continue != nil {
3161 trace.Wait100Continue()
3162 }
3163 }
3164
3165 func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
3166 if trace != nil && trace.WroteRequest != nil {
3167 trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
3168 }
3169 }
3170
3171 func traceFirstResponseByte(trace *httptrace.ClientTrace) {
3172 if trace != nil && trace.GotFirstResponseByte != nil {
3173 trace.GotFirstResponseByte()
3174 }
3175 }
3176
3177 func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
3178 if trace != nil {
3179 return trace.Got1xxResponse
3180 }
3181 return nil
3182 }
3183
3184
3185
3186 func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
3187 dialer := &tls.Dialer{
3188 Config: cfg,
3189 }
3190 cn, err := dialer.DialContext(ctx, network, addr)
3191 if err != nil {
3192 return nil, err
3193 }
3194 tlsCn := cn.(*tls.Conn)
3195 return tlsCn, nil
3196 }
3197
View as plain text