36
36
__all__ = ["HTTPSConnectionPool" ]
37
37
38
38
39
+ class HTTPSConnection (urllib3 .connection .HTTPSConnection ):
40
+ def __init__ (self , * args : Any , ** kwargs : Any ) -> None :
41
+ self ._elastic_assert_fingerprint : Optional [str ] = None
42
+ super ().__init__ (* args , ** kwargs )
43
+
44
+ def connect (self ) -> None :
45
+ super ().connect ()
46
+ # Hack to prevent a warning within HTTPSConnectionPool._validate_conn()
47
+ if self ._elastic_assert_fingerprint :
48
+ self .is_verified = True
49
+
50
+
39
51
class HTTPSConnectionPool (urllib3 .HTTPSConnectionPool ):
52
+ ConnectionCls = HTTPSConnection
53
+
40
54
"""HTTPSConnectionPool implementation which supports ``assert_fingerprint``
41
55
on certificates within the chain instead of only the leaf cert using private
42
56
APIs in CPython 3.10+
@@ -60,18 +74,26 @@ def __init__(
60
74
f", should be one of '{ valid_lengths } '"
61
75
)
62
76
63
- if assert_fingerprint :
64
- # Falsey but not None. This is a hack to skip fingerprinting by urllib3
65
- # but still set 'is_verified=True' within HTTPSConnectionPool._validate_conn()
66
- kwargs ["assert_fingerprint" ] = ""
77
+ if self ._elastic_assert_fingerprint :
78
+ # Skip fingerprinting by urllib3 as we'll do it ourselves
79
+ kwargs ["assert_fingerprint" ] = None
67
80
68
81
super ().__init__ (* args , ** kwargs )
69
82
70
- def _validate_conn (self , conn : urllib3 .connection .HTTPSConnection ) -> None :
83
+ def _new_conn (self ) -> HTTPSConnection :
84
+ """
85
+ Return a fresh :class:`urllib3.connection.HTTPSConnection`.
86
+ """
87
+ conn : HTTPSConnection = super ()._new_conn () # type: ignore[assignment]
88
+ # Tell our custom connection if we'll assert fingerprint ourselves
89
+ conn ._elastic_assert_fingerprint = self ._elastic_assert_fingerprint
90
+ return conn
91
+
92
+ def _validate_conn (self , conn : HTTPSConnection ) -> None : # type: ignore[override]
71
93
"""
72
94
Called right before a request is made, after the socket is created.
73
95
"""
74
- super (HTTPSConnectionPool , self )._validate_conn (conn ) # type: ignore[misc]
96
+ super (HTTPSConnectionPool , self )._validate_conn (conn )
75
97
76
98
if self ._elastic_assert_fingerprint :
77
99
hash_func = _HASHES_BY_LENGTH [len (self ._elastic_assert_fingerprint )]
@@ -89,7 +111,7 @@ def _validate_conn(self, conn: urllib3.connection.HTTPSConnection) -> None:
89
111
# See: https://github.com/python/cpython/pull/25467
90
112
fingerprints = [
91
113
hash_func (cert .public_bytes (_ENCODING_DER )).digest ()
92
- for cert in conn .sock ._sslobj .get_verified_chain ()
114
+ for cert in conn .sock ._sslobj .get_verified_chain () # type: ignore[union-attr]
93
115
]
94
116
except RERAISE_EXCEPTIONS : # pragma: nocover
95
117
raise
@@ -100,7 +122,7 @@ def _validate_conn(self, conn: urllib3.connection.HTTPSConnection) -> None:
100
122
101
123
# Only add the peercert in front of the chain if it's not there for some reason.
102
124
# This is to make sure old behavior of 'ssl_assert_fingerprint' still works.
103
- peercert_fingerprint = hash_func (conn .sock .getpeercert (True )).digest ()
125
+ peercert_fingerprint = hash_func (conn .sock .getpeercert (True )).digest () # type: ignore[union-attr]
104
126
if peercert_fingerprint not in fingerprints : # pragma: nocover
105
127
fingerprints .insert (0 , peercert_fingerprint )
106
128
0 commit comments