mitmproxy.connection
1import uuid 2import warnings 3from abc import ABCMeta 4from collections.abc import Sequence 5from enum import Flag 6from typing import Literal, Optional 7 8from mitmproxy import certs 9from mitmproxy.coretypes import serializable 10from mitmproxy.net import server_spec 11from mitmproxy.utils import human 12 13 14class ConnectionState(Flag): 15 """The current state of the underlying socket.""" 16 17 CLOSED = 0 18 CAN_READ = 1 19 CAN_WRITE = 2 20 OPEN = CAN_READ | CAN_WRITE 21 22 23TransportProtocol = Literal["tcp", "udp"] 24 25 26# practically speaking we may have IPv6 addresses with flowinfo and scope_id, 27# but type checking isn't good enough to properly handle tuple unions. 28# this version at least provides useful type checking messages. 29Address = tuple[str, int] 30 31 32class Connection(serializable.Serializable, metaclass=ABCMeta): 33 """ 34 Base class for client and server connections. 35 36 The connection object only exposes metadata about the connection, but not the underlying socket object. 37 This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively. 38 """ 39 40 # all connections have a unique id. While 41 # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity), 42 # we also want these semantics for recorded flows. 43 id: str 44 """A unique UUID to identify the connection.""" 45 state: ConnectionState 46 """The current connection state.""" 47 transport_protocol: TransportProtocol 48 """The connection protocol in use.""" 49 peername: Optional[Address] 50 """The remote's `(ip, port)` tuple for this connection.""" 51 sockname: Optional[Address] 52 """Our local `(ip, port)` tuple for this connection.""" 53 error: Optional[str] = None 54 """ 55 A string describing a general error with connections to this address. 56 57 The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, 58 for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error 59 property. This property is only reused per client connection. 60 """ 61 62 tls: bool = False 63 """ 64 `True` if TLS should be established, `False` otherwise. 65 Note that this property only describes if a connection should eventually be protected using TLS. 66 To check if TLS has already been established, use `Connection.tls_established`. 67 """ 68 certificate_list: Sequence[certs.Cert] = () 69 """ 70 The TLS certificate list as sent by the peer. 71 The first certificate is the end-entity certificate. 72 73 > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each 74 > certificate to certify the one immediately preceding it; however, 75 > some implementations allowed some flexibility. Servers sometimes 76 > send both a current and deprecated intermediate for transitional 77 > purposes, and others are simply configured incorrectly, but these 78 > cases can nonetheless be validated properly. For maximum 79 > compatibility, all implementations SHOULD be prepared to handle 80 > potentially extraneous certificates and arbitrary orderings from any 81 > TLS version, with the exception of the end-entity certificate which 82 > MUST be first. 83 """ 84 alpn: Optional[bytes] = None 85 """The application-layer protocol as negotiated using 86 [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation).""" 87 alpn_offers: Sequence[bytes] = () 88 """The ALPN offers as sent in the ClientHello.""" 89 # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography 90 cipher: Optional[str] = None 91 """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`.""" 92 cipher_list: Sequence[str] = () 93 """Ciphers accepted by the proxy server on this connection.""" 94 tls_version: Optional[str] = None 95 """The active TLS version.""" 96 sni: Optional[str] = None 97 """ 98 The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello. 99 """ 100 101 timestamp_start: Optional[float] 102 timestamp_end: Optional[float] = None 103 """*Timestamp:* Connection has been closed.""" 104 timestamp_tls_setup: Optional[float] = None 105 """*Timestamp:* TLS handshake has been completed successfully.""" 106 107 @property 108 def connected(self) -> bool: 109 """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise.""" 110 return self.state is ConnectionState.OPEN 111 112 @property 113 def tls_established(self) -> bool: 114 """*Read-only:* `True` if TLS has been established, `False` otherwise.""" 115 return self.timestamp_tls_setup is not None 116 117 def __eq__(self, other): 118 if isinstance(other, Connection): 119 return self.id == other.id 120 return False 121 122 def __hash__(self): 123 return hash(self.id) 124 125 def __repr__(self): 126 attrs = repr( 127 { 128 k: { 129 "cipher_list": lambda: f"<{len(v)} ciphers>", 130 "id": lambda: f"…{v[-6:]}", 131 }.get(k, lambda: v)() 132 for k, v in self.__dict__.items() 133 } 134 ) 135 return f"{type(self).__name__}({attrs})" 136 137 @property 138 def alpn_proto_negotiated(self) -> Optional[bytes]: # pragma: no cover 139 """*Deprecated:* An outdated alias for Connection.alpn.""" 140 warnings.warn( 141 "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.", 142 DeprecationWarning, 143 stacklevel=2, 144 ) 145 return self.alpn 146 147 148class Client(Connection): 149 """A connection between a client and mitmproxy.""" 150 151 peername: Address 152 """The client's address.""" 153 sockname: Address 154 """The local address we received this connection on.""" 155 156 mitmcert: Optional[certs.Cert] = None 157 """ 158 The certificate used by mitmproxy to establish TLS with the client. 159 """ 160 161 timestamp_start: float 162 """*Timestamp:* TCP SYN received""" 163 164 def __init__( 165 self, 166 peername: Address, 167 sockname: Address, 168 timestamp_start: float, 169 *, 170 transport_protocol: TransportProtocol = "tcp", 171 ): 172 self.id = str(uuid.uuid4()) 173 self.peername = peername 174 self.sockname = sockname 175 self.timestamp_start = timestamp_start 176 self.state = ConnectionState.OPEN 177 self.transport_protocol = transport_protocol 178 179 def __str__(self): 180 if self.alpn: 181 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 182 elif self.tls_established: 183 tls_state = ", tls" 184 else: 185 tls_state = "" 186 return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})" 187 188 def get_state(self): 189 # Important: Retain full compatibility with old proxy core for now! 190 # This means we need to add all new fields to the old implementation. 191 return { 192 "address": self.peername, 193 "alpn": self.alpn, 194 "cipher_name": self.cipher, 195 "id": self.id, 196 "mitmcert": self.mitmcert.get_state() 197 if self.mitmcert is not None 198 else None, 199 "sni": self.sni, 200 "timestamp_end": self.timestamp_end, 201 "timestamp_start": self.timestamp_start, 202 "timestamp_tls_setup": self.timestamp_tls_setup, 203 "tls_established": self.tls_established, 204 "tls_extensions": [], 205 "tls_version": self.tls_version, 206 # only used in sans-io 207 "state": self.state.value, 208 "sockname": self.sockname, 209 "error": self.error, 210 "tls": self.tls, 211 "certificate_list": [x.get_state() for x in self.certificate_list], 212 "alpn_offers": self.alpn_offers, 213 "cipher_list": self.cipher_list, 214 } 215 216 @classmethod 217 def from_state(cls, state) -> "Client": 218 client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"]) 219 client.set_state(state) 220 return client 221 222 def set_state(self, state): 223 self.peername = tuple(state["address"]) if state["address"] else None 224 self.alpn = state["alpn"] 225 self.cipher = state["cipher_name"] 226 self.id = state["id"] 227 self.sni = state["sni"] 228 self.timestamp_end = state["timestamp_end"] 229 self.timestamp_start = state["timestamp_start"] 230 self.timestamp_tls_setup = state["timestamp_tls_setup"] 231 self.tls_version = state["tls_version"] 232 # only used in sans-io 233 self.state = ConnectionState(state["state"]) 234 self.sockname = tuple(state["sockname"]) if state["sockname"] else None 235 self.error = state["error"] 236 self.tls = state["tls"] 237 self.certificate_list = [ 238 certs.Cert.from_state(x) for x in state["certificate_list"] 239 ] 240 self.mitmcert = ( 241 certs.Cert.from_state(state["mitmcert"]) 242 if state["mitmcert"] is not None 243 else None 244 ) 245 self.alpn_offers = state["alpn_offers"] 246 self.cipher_list = state["cipher_list"] 247 248 @property 249 def address(self): # pragma: no cover 250 """*Deprecated:* An outdated alias for Client.peername.""" 251 warnings.warn( 252 "Client.address is deprecated, use Client.peername instead.", 253 DeprecationWarning, 254 stacklevel=2, 255 ) 256 return self.peername 257 258 @address.setter 259 def address(self, x): # pragma: no cover 260 warnings.warn( 261 "Client.address is deprecated, use Client.peername instead.", 262 DeprecationWarning, 263 stacklevel=2, 264 ) 265 self.peername = x 266 267 @property 268 def cipher_name(self) -> Optional[str]: # pragma: no cover 269 """*Deprecated:* An outdated alias for Connection.cipher.""" 270 warnings.warn( 271 "Client.cipher_name is deprecated, use Client.cipher instead.", 272 DeprecationWarning, 273 stacklevel=2, 274 ) 275 return self.cipher 276 277 @property 278 def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover 279 """*Deprecated:* An outdated alias for Connection.certificate_list[0].""" 280 warnings.warn( 281 "Client.clientcert is deprecated, use Client.certificate_list instead.", 282 DeprecationWarning, 283 stacklevel=2, 284 ) 285 if self.certificate_list: 286 return self.certificate_list[0] 287 else: 288 return None 289 290 @clientcert.setter 291 def clientcert(self, val): # pragma: no cover 292 warnings.warn( 293 "Client.clientcert is deprecated, use Client.certificate_list instead.", 294 DeprecationWarning, 295 stacklevel=2, 296 ) 297 if val: 298 self.certificate_list = [val] 299 else: 300 self.certificate_list = [] 301 302 303class Server(Connection): 304 """A connection between mitmproxy and an upstream server.""" 305 306 peername: Optional[Address] = None 307 """The server's resolved `(ip, port)` tuple. Will be set during connection establishment.""" 308 sockname: Optional[Address] = None 309 address: Optional[Address] 310 """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address.""" 311 312 timestamp_start: Optional[float] = None 313 """*Timestamp:* TCP SYN sent.""" 314 timestamp_tcp_setup: Optional[float] = None 315 """*Timestamp:* TCP ACK received.""" 316 317 via: Optional[server_spec.ServerSpec] = None 318 """An optional proxy server specification via which the connection should be established.""" 319 320 def __init__( 321 self, 322 address: Optional[Address], 323 *, 324 transport_protocol: TransportProtocol = "tcp", 325 ): 326 self.id = str(uuid.uuid4()) 327 self.address = address 328 self.state = ConnectionState.CLOSED 329 self.transport_protocol = transport_protocol 330 331 def __str__(self): 332 if self.alpn: 333 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 334 elif self.tls_established: 335 tls_state = ", tls" 336 else: 337 tls_state = "" 338 if self.sockname: 339 local_port = f", src_port={self.sockname[1]}" 340 else: 341 local_port = "" 342 return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})" 343 344 def __setattr__(self, name, value): 345 if name in ("address", "via"): 346 connection_open = ( 347 self.__dict__.get("state", ConnectionState.CLOSED) 348 is ConnectionState.OPEN 349 ) 350 # assigning the current value is okay, that may be an artifact of calling .set_state(). 351 attr_changed = self.__dict__.get(name) != value 352 if connection_open and attr_changed: 353 raise RuntimeError(f"Cannot change server.{name} on open connection.") 354 return super().__setattr__(name, value) 355 356 def get_state(self): 357 return { 358 "address": self.address, 359 "alpn": self.alpn, 360 "id": self.id, 361 "ip_address": self.peername, 362 "sni": self.sni, 363 "source_address": self.sockname, 364 "timestamp_end": self.timestamp_end, 365 "timestamp_start": self.timestamp_start, 366 "timestamp_tcp_setup": self.timestamp_tcp_setup, 367 "timestamp_tls_setup": self.timestamp_tls_setup, 368 "tls_established": self.tls_established, 369 "tls_version": self.tls_version, 370 "via": None, 371 # only used in sans-io 372 "state": self.state.value, 373 "error": self.error, 374 "tls": self.tls, 375 "certificate_list": [x.get_state() for x in self.certificate_list], 376 "alpn_offers": self.alpn_offers, 377 "cipher_name": self.cipher, 378 "cipher_list": self.cipher_list, 379 "via2": self.via, 380 } 381 382 @classmethod 383 def from_state(cls, state) -> "Server": 384 server = Server(None) 385 server.set_state(state) 386 return server 387 388 def set_state(self, state): 389 self.address = tuple(state["address"]) if state["address"] else None 390 self.alpn = state["alpn"] 391 self.id = state["id"] 392 self.peername = tuple(state["ip_address"]) if state["ip_address"] else None 393 self.sni = state["sni"] 394 self.sockname = ( 395 tuple(state["source_address"]) if state["source_address"] else None 396 ) 397 self.timestamp_end = state["timestamp_end"] 398 self.timestamp_start = state["timestamp_start"] 399 self.timestamp_tcp_setup = state["timestamp_tcp_setup"] 400 self.timestamp_tls_setup = state["timestamp_tls_setup"] 401 self.tls_version = state["tls_version"] 402 self.state = ConnectionState(state["state"]) 403 self.error = state["error"] 404 self.tls = state["tls"] 405 self.certificate_list = [ 406 certs.Cert.from_state(x) for x in state["certificate_list"] 407 ] 408 self.alpn_offers = state["alpn_offers"] 409 self.cipher = state["cipher_name"] 410 self.cipher_list = state["cipher_list"] 411 self.via = state["via2"] 412 413 @property 414 def ip_address(self) -> Optional[Address]: # pragma: no cover 415 """*Deprecated:* An outdated alias for `Server.peername`.""" 416 warnings.warn( 417 "Server.ip_address is deprecated, use Server.peername instead.", 418 DeprecationWarning, 419 stacklevel=2, 420 ) 421 return self.peername 422 423 @property 424 def cert(self) -> Optional[certs.Cert]: # pragma: no cover 425 """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`.""" 426 warnings.warn( 427 "Server.cert is deprecated, use Server.certificate_list instead.", 428 DeprecationWarning, 429 stacklevel=2, 430 ) 431 if self.certificate_list: 432 return self.certificate_list[0] 433 else: 434 return None 435 436 @cert.setter 437 def cert(self, val): # pragma: no cover 438 warnings.warn( 439 "Server.cert is deprecated, use Server.certificate_list instead.", 440 DeprecationWarning, 441 stacklevel=2, 442 ) 443 if val: 444 self.certificate_list = [val] 445 else: 446 self.certificate_list = [] 447 448 449__all__ = ["Connection", "Client", "Server", "ConnectionState"]
33class Connection(serializable.Serializable, metaclass=ABCMeta): 34 """ 35 Base class for client and server connections. 36 37 The connection object only exposes metadata about the connection, but not the underlying socket object. 38 This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively. 39 """ 40 41 # all connections have a unique id. While 42 # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity), 43 # we also want these semantics for recorded flows. 44 id: str 45 """A unique UUID to identify the connection.""" 46 state: ConnectionState 47 """The current connection state.""" 48 transport_protocol: TransportProtocol 49 """The connection protocol in use.""" 50 peername: Optional[Address] 51 """The remote's `(ip, port)` tuple for this connection.""" 52 sockname: Optional[Address] 53 """Our local `(ip, port)` tuple for this connection.""" 54 error: Optional[str] = None 55 """ 56 A string describing a general error with connections to this address. 57 58 The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, 59 for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error 60 property. This property is only reused per client connection. 61 """ 62 63 tls: bool = False 64 """ 65 `True` if TLS should be established, `False` otherwise. 66 Note that this property only describes if a connection should eventually be protected using TLS. 67 To check if TLS has already been established, use `Connection.tls_established`. 68 """ 69 certificate_list: Sequence[certs.Cert] = () 70 """ 71 The TLS certificate list as sent by the peer. 72 The first certificate is the end-entity certificate. 73 74 > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each 75 > certificate to certify the one immediately preceding it; however, 76 > some implementations allowed some flexibility. Servers sometimes 77 > send both a current and deprecated intermediate for transitional 78 > purposes, and others are simply configured incorrectly, but these 79 > cases can nonetheless be validated properly. For maximum 80 > compatibility, all implementations SHOULD be prepared to handle 81 > potentially extraneous certificates and arbitrary orderings from any 82 > TLS version, with the exception of the end-entity certificate which 83 > MUST be first. 84 """ 85 alpn: Optional[bytes] = None 86 """The application-layer protocol as negotiated using 87 [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation).""" 88 alpn_offers: Sequence[bytes] = () 89 """The ALPN offers as sent in the ClientHello.""" 90 # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography 91 cipher: Optional[str] = None 92 """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`.""" 93 cipher_list: Sequence[str] = () 94 """Ciphers accepted by the proxy server on this connection.""" 95 tls_version: Optional[str] = None 96 """The active TLS version.""" 97 sni: Optional[str] = None 98 """ 99 The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello. 100 """ 101 102 timestamp_start: Optional[float] 103 timestamp_end: Optional[float] = None 104 """*Timestamp:* Connection has been closed.""" 105 timestamp_tls_setup: Optional[float] = None 106 """*Timestamp:* TLS handshake has been completed successfully.""" 107 108 @property 109 def connected(self) -> bool: 110 """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise.""" 111 return self.state is ConnectionState.OPEN 112 113 @property 114 def tls_established(self) -> bool: 115 """*Read-only:* `True` if TLS has been established, `False` otherwise.""" 116 return self.timestamp_tls_setup is not None 117 118 def __eq__(self, other): 119 if isinstance(other, Connection): 120 return self.id == other.id 121 return False 122 123 def __hash__(self): 124 return hash(self.id) 125 126 def __repr__(self): 127 attrs = repr( 128 { 129 k: { 130 "cipher_list": lambda: f"<{len(v)} ciphers>", 131 "id": lambda: f"…{v[-6:]}", 132 }.get(k, lambda: v)() 133 for k, v in self.__dict__.items() 134 } 135 ) 136 return f"{type(self).__name__}({attrs})" 137 138 @property 139 def alpn_proto_negotiated(self) -> Optional[bytes]: # pragma: no cover 140 """*Deprecated:* An outdated alias for Connection.alpn.""" 141 warnings.warn( 142 "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.", 143 DeprecationWarning, 144 stacklevel=2, 145 ) 146 return self.alpn
Base class for client and server connections.
The connection object only exposes metadata about the connection, but not the underlying socket object.
This is intentional, all I/O should be handled by mitmproxy.proxy.server
exclusively.
A string describing a general error with connections to this address.
The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error property. This property is only reused per client connection.
True
if TLS should be established, False
otherwise.
Note that this property only describes if a connection should eventually be protected using TLS.
To check if TLS has already been established, use Connection.tls_established
.
The TLS certificate list as sent by the peer. The first certificate is the end-entity certificate.
[RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each certificate to certify the one immediately preceding it; however, some implementations allowed some flexibility. Servers sometimes send both a current and deprecated intermediate for transitional purposes, and others are simply configured incorrectly, but these cases can nonetheless be validated properly. For maximum compatibility, all implementations SHOULD be prepared to handle potentially extraneous certificates and arbitrary orderings from any TLS version, with the exception of the end-entity certificate which MUST be first.
Ciphers accepted by the proxy server on this connection.
Timestamp: TLS handshake has been completed successfully.
Inherited Members
- mitmproxy.coretypes.serializable.Serializable
- copy
149class Client(Connection): 150 """A connection between a client and mitmproxy.""" 151 152 peername: Address 153 """The client's address.""" 154 sockname: Address 155 """The local address we received this connection on.""" 156 157 mitmcert: Optional[certs.Cert] = None 158 """ 159 The certificate used by mitmproxy to establish TLS with the client. 160 """ 161 162 timestamp_start: float 163 """*Timestamp:* TCP SYN received""" 164 165 def __init__( 166 self, 167 peername: Address, 168 sockname: Address, 169 timestamp_start: float, 170 *, 171 transport_protocol: TransportProtocol = "tcp", 172 ): 173 self.id = str(uuid.uuid4()) 174 self.peername = peername 175 self.sockname = sockname 176 self.timestamp_start = timestamp_start 177 self.state = ConnectionState.OPEN 178 self.transport_protocol = transport_protocol 179 180 def __str__(self): 181 if self.alpn: 182 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 183 elif self.tls_established: 184 tls_state = ", tls" 185 else: 186 tls_state = "" 187 return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})" 188 189 def get_state(self): 190 # Important: Retain full compatibility with old proxy core for now! 191 # This means we need to add all new fields to the old implementation. 192 return { 193 "address": self.peername, 194 "alpn": self.alpn, 195 "cipher_name": self.cipher, 196 "id": self.id, 197 "mitmcert": self.mitmcert.get_state() 198 if self.mitmcert is not None 199 else None, 200 "sni": self.sni, 201 "timestamp_end": self.timestamp_end, 202 "timestamp_start": self.timestamp_start, 203 "timestamp_tls_setup": self.timestamp_tls_setup, 204 "tls_established": self.tls_established, 205 "tls_extensions": [], 206 "tls_version": self.tls_version, 207 # only used in sans-io 208 "state": self.state.value, 209 "sockname": self.sockname, 210 "error": self.error, 211 "tls": self.tls, 212 "certificate_list": [x.get_state() for x in self.certificate_list], 213 "alpn_offers": self.alpn_offers, 214 "cipher_list": self.cipher_list, 215 } 216 217 @classmethod 218 def from_state(cls, state) -> "Client": 219 client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"]) 220 client.set_state(state) 221 return client 222 223 def set_state(self, state): 224 self.peername = tuple(state["address"]) if state["address"] else None 225 self.alpn = state["alpn"] 226 self.cipher = state["cipher_name"] 227 self.id = state["id"] 228 self.sni = state["sni"] 229 self.timestamp_end = state["timestamp_end"] 230 self.timestamp_start = state["timestamp_start"] 231 self.timestamp_tls_setup = state["timestamp_tls_setup"] 232 self.tls_version = state["tls_version"] 233 # only used in sans-io 234 self.state = ConnectionState(state["state"]) 235 self.sockname = tuple(state["sockname"]) if state["sockname"] else None 236 self.error = state["error"] 237 self.tls = state["tls"] 238 self.certificate_list = [ 239 certs.Cert.from_state(x) for x in state["certificate_list"] 240 ] 241 self.mitmcert = ( 242 certs.Cert.from_state(state["mitmcert"]) 243 if state["mitmcert"] is not None 244 else None 245 ) 246 self.alpn_offers = state["alpn_offers"] 247 self.cipher_list = state["cipher_list"] 248 249 @property 250 def address(self): # pragma: no cover 251 """*Deprecated:* An outdated alias for Client.peername.""" 252 warnings.warn( 253 "Client.address is deprecated, use Client.peername instead.", 254 DeprecationWarning, 255 stacklevel=2, 256 ) 257 return self.peername 258 259 @address.setter 260 def address(self, x): # pragma: no cover 261 warnings.warn( 262 "Client.address is deprecated, use Client.peername instead.", 263 DeprecationWarning, 264 stacklevel=2, 265 ) 266 self.peername = x 267 268 @property 269 def cipher_name(self) -> Optional[str]: # pragma: no cover 270 """*Deprecated:* An outdated alias for Connection.cipher.""" 271 warnings.warn( 272 "Client.cipher_name is deprecated, use Client.cipher instead.", 273 DeprecationWarning, 274 stacklevel=2, 275 ) 276 return self.cipher 277 278 @property 279 def clientcert(self) -> Optional[certs.Cert]: # pragma: no cover 280 """*Deprecated:* An outdated alias for Connection.certificate_list[0].""" 281 warnings.warn( 282 "Client.clientcert is deprecated, use Client.certificate_list instead.", 283 DeprecationWarning, 284 stacklevel=2, 285 ) 286 if self.certificate_list: 287 return self.certificate_list[0] 288 else: 289 return None 290 291 @clientcert.setter 292 def clientcert(self, val): # pragma: no cover 293 warnings.warn( 294 "Client.clientcert is deprecated, use Client.certificate_list instead.", 295 DeprecationWarning, 296 stacklevel=2, 297 ) 298 if val: 299 self.certificate_list = [val] 300 else: 301 self.certificate_list = []
A connection between a client and mitmproxy.
165 def __init__( 166 self, 167 peername: Address, 168 sockname: Address, 169 timestamp_start: float, 170 *, 171 transport_protocol: TransportProtocol = "tcp", 172 ): 173 self.id = str(uuid.uuid4()) 174 self.peername = peername 175 self.sockname = sockname 176 self.timestamp_start = timestamp_start 177 self.state = ConnectionState.OPEN 178 self.transport_protocol = transport_protocol
Deprecated: An outdated alias for Connection.certificate_list[0].
Inherited Members
- Connection
- id
- state
- transport_protocol
- error
- tls
- certificate_list
- alpn
- alpn_offers
- cipher
- cipher_list
- tls_version
- sni
- timestamp_end
- timestamp_tls_setup
- connected
- tls_established
- alpn_proto_negotiated
- mitmproxy.coretypes.serializable.Serializable
- copy
304class Server(Connection): 305 """A connection between mitmproxy and an upstream server.""" 306 307 peername: Optional[Address] = None 308 """The server's resolved `(ip, port)` tuple. Will be set during connection establishment.""" 309 sockname: Optional[Address] = None 310 address: Optional[Address] 311 """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address.""" 312 313 timestamp_start: Optional[float] = None 314 """*Timestamp:* TCP SYN sent.""" 315 timestamp_tcp_setup: Optional[float] = None 316 """*Timestamp:* TCP ACK received.""" 317 318 via: Optional[server_spec.ServerSpec] = None 319 """An optional proxy server specification via which the connection should be established.""" 320 321 def __init__( 322 self, 323 address: Optional[Address], 324 *, 325 transport_protocol: TransportProtocol = "tcp", 326 ): 327 self.id = str(uuid.uuid4()) 328 self.address = address 329 self.state = ConnectionState.CLOSED 330 self.transport_protocol = transport_protocol 331 332 def __str__(self): 333 if self.alpn: 334 tls_state = f", alpn={self.alpn.decode(errors='replace')}" 335 elif self.tls_established: 336 tls_state = ", tls" 337 else: 338 tls_state = "" 339 if self.sockname: 340 local_port = f", src_port={self.sockname[1]}" 341 else: 342 local_port = "" 343 return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})" 344 345 def __setattr__(self, name, value): 346 if name in ("address", "via"): 347 connection_open = ( 348 self.__dict__.get("state", ConnectionState.CLOSED) 349 is ConnectionState.OPEN 350 ) 351 # assigning the current value is okay, that may be an artifact of calling .set_state(). 352 attr_changed = self.__dict__.get(name) != value 353 if connection_open and attr_changed: 354 raise RuntimeError(f"Cannot change server.{name} on open connection.") 355 return super().__setattr__(name, value) 356 357 def get_state(self): 358 return { 359 "address": self.address, 360 "alpn": self.alpn, 361 "id": self.id, 362 "ip_address": self.peername, 363 "sni": self.sni, 364 "source_address": self.sockname, 365 "timestamp_end": self.timestamp_end, 366 "timestamp_start": self.timestamp_start, 367 "timestamp_tcp_setup": self.timestamp_tcp_setup, 368 "timestamp_tls_setup": self.timestamp_tls_setup, 369 "tls_established": self.tls_established, 370 "tls_version": self.tls_version, 371 "via": None, 372 # only used in sans-io 373 "state": self.state.value, 374 "error": self.error, 375 "tls": self.tls, 376 "certificate_list": [x.get_state() for x in self.certificate_list], 377 "alpn_offers": self.alpn_offers, 378 "cipher_name": self.cipher, 379 "cipher_list": self.cipher_list, 380 "via2": self.via, 381 } 382 383 @classmethod 384 def from_state(cls, state) -> "Server": 385 server = Server(None) 386 server.set_state(state) 387 return server 388 389 def set_state(self, state): 390 self.address = tuple(state["address"]) if state["address"] else None 391 self.alpn = state["alpn"] 392 self.id = state["id"] 393 self.peername = tuple(state["ip_address"]) if state["ip_address"] else None 394 self.sni = state["sni"] 395 self.sockname = ( 396 tuple(state["source_address"]) if state["source_address"] else None 397 ) 398 self.timestamp_end = state["timestamp_end"] 399 self.timestamp_start = state["timestamp_start"] 400 self.timestamp_tcp_setup = state["timestamp_tcp_setup"] 401 self.timestamp_tls_setup = state["timestamp_tls_setup"] 402 self.tls_version = state["tls_version"] 403 self.state = ConnectionState(state["state"]) 404 self.error = state["error"] 405 self.tls = state["tls"] 406 self.certificate_list = [ 407 certs.Cert.from_state(x) for x in state["certificate_list"] 408 ] 409 self.alpn_offers = state["alpn_offers"] 410 self.cipher = state["cipher_name"] 411 self.cipher_list = state["cipher_list"] 412 self.via = state["via2"] 413 414 @property 415 def ip_address(self) -> Optional[Address]: # pragma: no cover 416 """*Deprecated:* An outdated alias for `Server.peername`.""" 417 warnings.warn( 418 "Server.ip_address is deprecated, use Server.peername instead.", 419 DeprecationWarning, 420 stacklevel=2, 421 ) 422 return self.peername 423 424 @property 425 def cert(self) -> Optional[certs.Cert]: # pragma: no cover 426 """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`.""" 427 warnings.warn( 428 "Server.cert is deprecated, use Server.certificate_list instead.", 429 DeprecationWarning, 430 stacklevel=2, 431 ) 432 if self.certificate_list: 433 return self.certificate_list[0] 434 else: 435 return None 436 437 @cert.setter 438 def cert(self, val): # pragma: no cover 439 warnings.warn( 440 "Server.cert is deprecated, use Server.certificate_list instead.", 441 DeprecationWarning, 442 stacklevel=2, 443 ) 444 if val: 445 self.certificate_list = [val] 446 else: 447 self.certificate_list = []
A connection between mitmproxy and an upstream server.
The server's resolved (ip, port)
tuple. Will be set during connection establishment.
The server's (host, port)
address tuple. The host can either be a domain or a plain IP address.
Deprecated: An outdated alias for Connection.certificate_list[0]
.
Inherited Members
- Connection
- id
- state
- transport_protocol
- error
- tls
- certificate_list
- alpn
- alpn_offers
- cipher
- cipher_list
- tls_version
- sni
- timestamp_end
- timestamp_tls_setup
- connected
- tls_established
- alpn_proto_negotiated
- mitmproxy.coretypes.serializable.Serializable
- copy
15class ConnectionState(Flag): 16 """The current state of the underlying socket.""" 17 18 CLOSED = 0 19 CAN_READ = 1 20 CAN_WRITE = 2 21 OPEN = CAN_READ | CAN_WRITE
The current state of the underlying socket.
Inherited Members
- enum.Enum
- name
- value