Edit on GitHub

mitmproxy.connection

  1import uuid
  2import warnings
  3from abc import ABCMeta
  4from collections.abc import Sequence
  5from enum import Flag
  6from typing import Literal, Optional
  7
  8from mitmproxy import certs
  9from mitmproxy.coretypes import serializable
 10from mitmproxy.net import server_spec
 11from mitmproxy.utils import human
 12
 13
 14class ConnectionState(Flag):
 15    """The current state of the underlying socket."""
 16
 17    CLOSED = 0
 18    CAN_READ = 1
 19    CAN_WRITE = 2
 20    OPEN = CAN_READ | CAN_WRITE
 21
 22
 23TransportProtocol = Literal["tcp", "udp"]
 24
 25
 26# practically speaking we may have IPv6 addresses with flowinfo and scope_id,
 27# but type checking isn't good enough to properly handle tuple unions.
 28# this version at least provides useful type checking messages.
 29Address = tuple[str, int]
 30
 31
 32class Connection(serializable.Serializable, metaclass=ABCMeta):
 33    """
 34    Base class for client and server connections.
 35
 36    The connection object only exposes metadata about the connection, but not the underlying socket object.
 37    This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
 38    """
 39
 40    # all connections have a unique id. While
 41    # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
 42    # we also want these semantics for recorded flows.
 43    id: str
 44    """A unique UUID to identify the connection."""
 45    state: ConnectionState
 46    """The current connection state."""
 47    transport_protocol: TransportProtocol
 48    """The connection protocol in use."""
 49    peername: Optional[Address]
 50    """The remote's `(ip, port)` tuple for this connection."""
 51    sockname: Optional[Address]
 52    """Our local `(ip, port)` tuple for this connection."""
 53    error: Optional[str] = None
 54    """
 55    A string describing a general error with connections to this address.
 56
 57    The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
 58    for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
 59    property. This property is only reused per client connection.
 60    """
 61
 62    tls: bool = False
 63    """
 64    `True` if TLS should be established, `False` otherwise.
 65    Note that this property only describes if a connection should eventually be protected using TLS.
 66    To check if TLS has already been established, use `Connection.tls_established`.
 67    """
 68    certificate_list: Sequence[certs.Cert] = ()
 69    """
 70    The TLS certificate list as sent by the peer.
 71    The first certificate is the end-entity certificate.
 72
 73    > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
 74    > certificate to certify the one immediately preceding it; however,
 75    > some implementations allowed some flexibility.  Servers sometimes
 76    > send both a current and deprecated intermediate for transitional
 77    > purposes, and others are simply configured incorrectly, but these
 78    > cases can nonetheless be validated properly.  For maximum
 79    > compatibility, all implementations SHOULD be prepared to handle
 80    > potentially extraneous certificates and arbitrary orderings from any
 81    > TLS version, with the exception of the end-entity certificate which
 82    > MUST be first.
 83    """
 84    alpn: Optional[bytes] = None
 85    """The application-layer protocol as negotiated using
 86    [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
 87    alpn_offers: Sequence[bytes] = ()
 88    """The ALPN offers as sent in the ClientHello."""
 89    # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
 90    cipher: Optional[str] = None
 91    """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
 92    cipher_list: Sequence[str] = ()
 93    """Ciphers accepted by the proxy server on this connection."""
 94    tls_version: Optional[str] = None
 95    """The active TLS version."""
 96    sni: Optional[str] = None
 97    """
 98    The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
 99    """
100
101    timestamp_start: Optional[float]
102    timestamp_end: Optional[float] = None
103    """*Timestamp:* Connection has been closed."""
104    timestamp_tls_setup: Optional[float] = None
105    """*Timestamp:* TLS handshake has been completed successfully."""
106
107    @property
108    def connected(self) -> bool:
109        """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise."""
110        return self.state is ConnectionState.OPEN
111
112    @property
113    def tls_established(self) -> bool:
114        """*Read-only:* `True` if TLS has been established, `False` otherwise."""
115        return self.timestamp_tls_setup is not None
116
117    def __eq__(self, other):
118        if isinstance(other, Connection):
119            return self.id == other.id
120        return False
121
122    def __hash__(self):
123        return hash(self.id)
124
125    def __repr__(self):
126        attrs = repr(
127            {
128                k: {
129                    "cipher_list": lambda: f"<{len(v)} ciphers>",
130                    "id": lambda: f"…{v[-6:]}",
131                }.get(k, lambda: v)()
132                for k, v in self.__dict__.items()
133            }
134        )
135        return f"{type(self).__name__}({attrs})"
136
137    @property
138    def alpn_proto_negotiated(self) -> Optional[bytes]:  # pragma: no cover
139        """*Deprecated:* An outdated alias for Connection.alpn."""
140        warnings.warn(
141            "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.",
142            DeprecationWarning,
143            stacklevel=2,
144        )
145        return self.alpn
146
147
148class Client(Connection):
149    """A connection between a client and mitmproxy."""
150
151    peername: Address
152    """The client's address."""
153    sockname: Address
154    """The local address we received this connection on."""
155
156    mitmcert: Optional[certs.Cert] = None
157    """
158    The certificate used by mitmproxy to establish TLS with the client.
159    """
160
161    timestamp_start: float
162    """*Timestamp:* TCP SYN received"""
163
164    def __init__(
165        self,
166        peername: Address,
167        sockname: Address,
168        timestamp_start: float,
169        *,
170        transport_protocol: TransportProtocol = "tcp",
171    ):
172        self.id = str(uuid.uuid4())
173        self.peername = peername
174        self.sockname = sockname
175        self.timestamp_start = timestamp_start
176        self.state = ConnectionState.OPEN
177        self.transport_protocol = transport_protocol
178
179    def __str__(self):
180        if self.alpn:
181            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
182        elif self.tls_established:
183            tls_state = ", tls"
184        else:
185            tls_state = ""
186        return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})"
187
188    def get_state(self):
189        # Important: Retain full compatibility with old proxy core for now!
190        # This means we need to add all new fields to the old implementation.
191        return {
192            "address": self.peername,
193            "alpn": self.alpn,
194            "cipher_name": self.cipher,
195            "id": self.id,
196            "mitmcert": self.mitmcert.get_state()
197            if self.mitmcert is not None
198            else None,
199            "sni": self.sni,
200            "timestamp_end": self.timestamp_end,
201            "timestamp_start": self.timestamp_start,
202            "timestamp_tls_setup": self.timestamp_tls_setup,
203            "tls_established": self.tls_established,
204            "tls_extensions": [],
205            "tls_version": self.tls_version,
206            # only used in sans-io
207            "state": self.state.value,
208            "sockname": self.sockname,
209            "error": self.error,
210            "tls": self.tls,
211            "certificate_list": [x.get_state() for x in self.certificate_list],
212            "alpn_offers": self.alpn_offers,
213            "cipher_list": self.cipher_list,
214        }
215
216    @classmethod
217    def from_state(cls, state) -> "Client":
218        client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"])
219        client.set_state(state)
220        return client
221
222    def set_state(self, state):
223        self.peername = tuple(state["address"]) if state["address"] else None
224        self.alpn = state["alpn"]
225        self.cipher = state["cipher_name"]
226        self.id = state["id"]
227        self.sni = state["sni"]
228        self.timestamp_end = state["timestamp_end"]
229        self.timestamp_start = state["timestamp_start"]
230        self.timestamp_tls_setup = state["timestamp_tls_setup"]
231        self.tls_version = state["tls_version"]
232        # only used in sans-io
233        self.state = ConnectionState(state["state"])
234        self.sockname = tuple(state["sockname"]) if state["sockname"] else None
235        self.error = state["error"]
236        self.tls = state["tls"]
237        self.certificate_list = [
238            certs.Cert.from_state(x) for x in state["certificate_list"]
239        ]
240        self.mitmcert = (
241            certs.Cert.from_state(state["mitmcert"])
242            if state["mitmcert"] is not None
243            else None
244        )
245        self.alpn_offers = state["alpn_offers"]
246        self.cipher_list = state["cipher_list"]
247
248    @property
249    def address(self):  # pragma: no cover
250        """*Deprecated:* An outdated alias for Client.peername."""
251        warnings.warn(
252            "Client.address is deprecated, use Client.peername instead.",
253            DeprecationWarning,
254            stacklevel=2,
255        )
256        return self.peername
257
258    @address.setter
259    def address(self, x):  # pragma: no cover
260        warnings.warn(
261            "Client.address is deprecated, use Client.peername instead.",
262            DeprecationWarning,
263            stacklevel=2,
264        )
265        self.peername = x
266
267    @property
268    def cipher_name(self) -> Optional[str]:  # pragma: no cover
269        """*Deprecated:* An outdated alias for Connection.cipher."""
270        warnings.warn(
271            "Client.cipher_name is deprecated, use Client.cipher instead.",
272            DeprecationWarning,
273            stacklevel=2,
274        )
275        return self.cipher
276
277    @property
278    def clientcert(self) -> Optional[certs.Cert]:  # pragma: no cover
279        """*Deprecated:* An outdated alias for Connection.certificate_list[0]."""
280        warnings.warn(
281            "Client.clientcert is deprecated, use Client.certificate_list instead.",
282            DeprecationWarning,
283            stacklevel=2,
284        )
285        if self.certificate_list:
286            return self.certificate_list[0]
287        else:
288            return None
289
290    @clientcert.setter
291    def clientcert(self, val):  # pragma: no cover
292        warnings.warn(
293            "Client.clientcert is deprecated, use Client.certificate_list instead.",
294            DeprecationWarning,
295            stacklevel=2,
296        )
297        if val:
298            self.certificate_list = [val]
299        else:
300            self.certificate_list = []
301
302
303class Server(Connection):
304    """A connection between mitmproxy and an upstream server."""
305
306    peername: Optional[Address] = None
307    """The server's resolved `(ip, port)` tuple. Will be set during connection establishment."""
308    sockname: Optional[Address] = None
309    address: Optional[Address]
310    """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address."""
311
312    timestamp_start: Optional[float] = None
313    """*Timestamp:* TCP SYN sent."""
314    timestamp_tcp_setup: Optional[float] = None
315    """*Timestamp:* TCP ACK received."""
316
317    via: Optional[server_spec.ServerSpec] = None
318    """An optional proxy server specification via which the connection should be established."""
319
320    def __init__(
321        self,
322        address: Optional[Address],
323        *,
324        transport_protocol: TransportProtocol = "tcp",
325    ):
326        self.id = str(uuid.uuid4())
327        self.address = address
328        self.state = ConnectionState.CLOSED
329        self.transport_protocol = transport_protocol
330
331    def __str__(self):
332        if self.alpn:
333            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
334        elif self.tls_established:
335            tls_state = ", tls"
336        else:
337            tls_state = ""
338        if self.sockname:
339            local_port = f", src_port={self.sockname[1]}"
340        else:
341            local_port = ""
342        return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
343
344    def __setattr__(self, name, value):
345        if name in ("address", "via"):
346            connection_open = (
347                self.__dict__.get("state", ConnectionState.CLOSED)
348                is ConnectionState.OPEN
349            )
350            # assigning the current value is okay, that may be an artifact of calling .set_state().
351            attr_changed = self.__dict__.get(name) != value
352            if connection_open and attr_changed:
353                raise RuntimeError(f"Cannot change server.{name} on open connection.")
354        return super().__setattr__(name, value)
355
356    def get_state(self):
357        return {
358            "address": self.address,
359            "alpn": self.alpn,
360            "id": self.id,
361            "ip_address": self.peername,
362            "sni": self.sni,
363            "source_address": self.sockname,
364            "timestamp_end": self.timestamp_end,
365            "timestamp_start": self.timestamp_start,
366            "timestamp_tcp_setup": self.timestamp_tcp_setup,
367            "timestamp_tls_setup": self.timestamp_tls_setup,
368            "tls_established": self.tls_established,
369            "tls_version": self.tls_version,
370            "via": None,
371            # only used in sans-io
372            "state": self.state.value,
373            "error": self.error,
374            "tls": self.tls,
375            "certificate_list": [x.get_state() for x in self.certificate_list],
376            "alpn_offers": self.alpn_offers,
377            "cipher_name": self.cipher,
378            "cipher_list": self.cipher_list,
379            "via2": self.via,
380        }
381
382    @classmethod
383    def from_state(cls, state) -> "Server":
384        server = Server(None)
385        server.set_state(state)
386        return server
387
388    def set_state(self, state):
389        self.address = tuple(state["address"]) if state["address"] else None
390        self.alpn = state["alpn"]
391        self.id = state["id"]
392        self.peername = tuple(state["ip_address"]) if state["ip_address"] else None
393        self.sni = state["sni"]
394        self.sockname = (
395            tuple(state["source_address"]) if state["source_address"] else None
396        )
397        self.timestamp_end = state["timestamp_end"]
398        self.timestamp_start = state["timestamp_start"]
399        self.timestamp_tcp_setup = state["timestamp_tcp_setup"]
400        self.timestamp_tls_setup = state["timestamp_tls_setup"]
401        self.tls_version = state["tls_version"]
402        self.state = ConnectionState(state["state"])
403        self.error = state["error"]
404        self.tls = state["tls"]
405        self.certificate_list = [
406            certs.Cert.from_state(x) for x in state["certificate_list"]
407        ]
408        self.alpn_offers = state["alpn_offers"]
409        self.cipher = state["cipher_name"]
410        self.cipher_list = state["cipher_list"]
411        self.via = state["via2"]
412
413    @property
414    def ip_address(self) -> Optional[Address]:  # pragma: no cover
415        """*Deprecated:* An outdated alias for `Server.peername`."""
416        warnings.warn(
417            "Server.ip_address is deprecated, use Server.peername instead.",
418            DeprecationWarning,
419            stacklevel=2,
420        )
421        return self.peername
422
423    @property
424    def cert(self) -> Optional[certs.Cert]:  # pragma: no cover
425        """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`."""
426        warnings.warn(
427            "Server.cert is deprecated, use Server.certificate_list instead.",
428            DeprecationWarning,
429            stacklevel=2,
430        )
431        if self.certificate_list:
432            return self.certificate_list[0]
433        else:
434            return None
435
436    @cert.setter
437    def cert(self, val):  # pragma: no cover
438        warnings.warn(
439            "Server.cert is deprecated, use Server.certificate_list instead.",
440            DeprecationWarning,
441            stacklevel=2,
442        )
443        if val:
444            self.certificate_list = [val]
445        else:
446            self.certificate_list = []
447
448
449__all__ = ["Connection", "Client", "Server", "ConnectionState"]
class Connection(mitmproxy.coretypes.serializable.Serializable):
 33class Connection(serializable.Serializable, metaclass=ABCMeta):
 34    """
 35    Base class for client and server connections.
 36
 37    The connection object only exposes metadata about the connection, but not the underlying socket object.
 38    This is intentional, all I/O should be handled by `mitmproxy.proxy.server` exclusively.
 39    """
 40
 41    # all connections have a unique id. While
 42    # f.client_conn == f2.client_conn already holds true for live flows (where we have object identity),
 43    # we also want these semantics for recorded flows.
 44    id: str
 45    """A unique UUID to identify the connection."""
 46    state: ConnectionState
 47    """The current connection state."""
 48    transport_protocol: TransportProtocol
 49    """The connection protocol in use."""
 50    peername: Optional[Address]
 51    """The remote's `(ip, port)` tuple for this connection."""
 52    sockname: Optional[Address]
 53    """Our local `(ip, port)` tuple for this connection."""
 54    error: Optional[str] = None
 55    """
 56    A string describing a general error with connections to this address.
 57
 58    The purpose of this property is to signal that new connections to the particular endpoint should not be attempted,
 59    for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error
 60    property. This property is only reused per client connection.
 61    """
 62
 63    tls: bool = False
 64    """
 65    `True` if TLS should be established, `False` otherwise.
 66    Note that this property only describes if a connection should eventually be protected using TLS.
 67    To check if TLS has already been established, use `Connection.tls_established`.
 68    """
 69    certificate_list: Sequence[certs.Cert] = ()
 70    """
 71    The TLS certificate list as sent by the peer.
 72    The first certificate is the end-entity certificate.
 73
 74    > [RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each
 75    > certificate to certify the one immediately preceding it; however,
 76    > some implementations allowed some flexibility.  Servers sometimes
 77    > send both a current and deprecated intermediate for transitional
 78    > purposes, and others are simply configured incorrectly, but these
 79    > cases can nonetheless be validated properly.  For maximum
 80    > compatibility, all implementations SHOULD be prepared to handle
 81    > potentially extraneous certificates and arbitrary orderings from any
 82    > TLS version, with the exception of the end-entity certificate which
 83    > MUST be first.
 84    """
 85    alpn: Optional[bytes] = None
 86    """The application-layer protocol as negotiated using
 87    [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation)."""
 88    alpn_offers: Sequence[bytes] = ()
 89    """The ALPN offers as sent in the ClientHello."""
 90    # we may want to add SSL_CIPHER_description here, but that's currently not exposed by cryptography
 91    cipher: Optional[str] = None
 92    """The active cipher name as returned by OpenSSL's `SSL_CIPHER_get_name`."""
 93    cipher_list: Sequence[str] = ()
 94    """Ciphers accepted by the proxy server on this connection."""
 95    tls_version: Optional[str] = None
 96    """The active TLS version."""
 97    sni: Optional[str] = None
 98    """
 99    The [Server Name Indication (SNI)](https://en.wikipedia.org/wiki/Server_Name_Indication) sent in the ClientHello.
100    """
101
102    timestamp_start: Optional[float]
103    timestamp_end: Optional[float] = None
104    """*Timestamp:* Connection has been closed."""
105    timestamp_tls_setup: Optional[float] = None
106    """*Timestamp:* TLS handshake has been completed successfully."""
107
108    @property
109    def connected(self) -> bool:
110        """*Read-only:* `True` if Connection.state is ConnectionState.OPEN, `False` otherwise."""
111        return self.state is ConnectionState.OPEN
112
113    @property
114    def tls_established(self) -> bool:
115        """*Read-only:* `True` if TLS has been established, `False` otherwise."""
116        return self.timestamp_tls_setup is not None
117
118    def __eq__(self, other):
119        if isinstance(other, Connection):
120            return self.id == other.id
121        return False
122
123    def __hash__(self):
124        return hash(self.id)
125
126    def __repr__(self):
127        attrs = repr(
128            {
129                k: {
130                    "cipher_list": lambda: f"<{len(v)} ciphers>",
131                    "id": lambda: f"…{v[-6:]}",
132                }.get(k, lambda: v)()
133                for k, v in self.__dict__.items()
134            }
135        )
136        return f"{type(self).__name__}({attrs})"
137
138    @property
139    def alpn_proto_negotiated(self) -> Optional[bytes]:  # pragma: no cover
140        """*Deprecated:* An outdated alias for Connection.alpn."""
141        warnings.warn(
142            "Connection.alpn_proto_negotiated is deprecated, use Connection.alpn instead.",
143            DeprecationWarning,
144            stacklevel=2,
145        )
146        return self.alpn

Base class for client and server connections.

The connection object only exposes metadata about the connection, but not the underlying socket object. This is intentional, all I/O should be handled by mitmproxy.proxy.server exclusively.

id: str

A unique UUID to identify the connection.

The current connection state.

transport_protocol: Literal['tcp', 'udp']

The connection protocol in use.

peername: Optional[tuple[str, int]]

The remote's (ip, port) tuple for this connection.

sockname: Optional[tuple[str, int]]

Our local (ip, port) tuple for this connection.

error: Optional[str] = None

A string describing a general error with connections to this address.

The purpose of this property is to signal that new connections to the particular endpoint should not be attempted, for example because it uses an untrusted TLS certificate. Regular (unexpected) disconnects do not set the error property. This property is only reused per client connection.

tls: bool = False

True if TLS should be established, False otherwise. Note that this property only describes if a connection should eventually be protected using TLS. To check if TLS has already been established, use Connection.tls_established.

certificate_list: collections.abc.Sequence[mitmproxy.certs.Cert] = ()

The TLS certificate list as sent by the peer. The first certificate is the end-entity certificate.

[RFC 8446] Prior to TLS 1.3, "certificate_list" ordering required each certificate to certify the one immediately preceding it; however, some implementations allowed some flexibility. Servers sometimes send both a current and deprecated intermediate for transitional purposes, and others are simply configured incorrectly, but these cases can nonetheless be validated properly. For maximum compatibility, all implementations SHOULD be prepared to handle potentially extraneous certificates and arbitrary orderings from any TLS version, with the exception of the end-entity certificate which MUST be first.

alpn: Optional[bytes] = None

The application-layer protocol as negotiated using ALPN.

alpn_offers: collections.abc.Sequence[bytes] = ()

The ALPN offers as sent in the ClientHello.

cipher: Optional[str] = None

The active cipher name as returned by OpenSSL's SSL_CIPHER_get_name.

cipher_list: collections.abc.Sequence[str] = ()

Ciphers accepted by the proxy server on this connection.

tls_version: Optional[str] = None

The active TLS version.

sni: Optional[str] = None

The Server Name Indication (SNI) sent in the ClientHello.

timestamp_end: Optional[float] = None

Timestamp: Connection has been closed.

timestamp_tls_setup: Optional[float] = None

Timestamp: TLS handshake has been completed successfully.

connected: bool

Read-only: True if Connection.state is ConnectionState.OPEN, False otherwise.

tls_established: bool

Read-only: True if TLS has been established, False otherwise.

alpn_proto_negotiated: Optional[bytes]

Deprecated: An outdated alias for Connection.alpn.

Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy
class Client(Connection):
149class Client(Connection):
150    """A connection between a client and mitmproxy."""
151
152    peername: Address
153    """The client's address."""
154    sockname: Address
155    """The local address we received this connection on."""
156
157    mitmcert: Optional[certs.Cert] = None
158    """
159    The certificate used by mitmproxy to establish TLS with the client.
160    """
161
162    timestamp_start: float
163    """*Timestamp:* TCP SYN received"""
164
165    def __init__(
166        self,
167        peername: Address,
168        sockname: Address,
169        timestamp_start: float,
170        *,
171        transport_protocol: TransportProtocol = "tcp",
172    ):
173        self.id = str(uuid.uuid4())
174        self.peername = peername
175        self.sockname = sockname
176        self.timestamp_start = timestamp_start
177        self.state = ConnectionState.OPEN
178        self.transport_protocol = transport_protocol
179
180    def __str__(self):
181        if self.alpn:
182            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
183        elif self.tls_established:
184            tls_state = ", tls"
185        else:
186            tls_state = ""
187        return f"Client({human.format_address(self.peername)}, state={self.state.name.lower()}{tls_state})"
188
189    def get_state(self):
190        # Important: Retain full compatibility with old proxy core for now!
191        # This means we need to add all new fields to the old implementation.
192        return {
193            "address": self.peername,
194            "alpn": self.alpn,
195            "cipher_name": self.cipher,
196            "id": self.id,
197            "mitmcert": self.mitmcert.get_state()
198            if self.mitmcert is not None
199            else None,
200            "sni": self.sni,
201            "timestamp_end": self.timestamp_end,
202            "timestamp_start": self.timestamp_start,
203            "timestamp_tls_setup": self.timestamp_tls_setup,
204            "tls_established": self.tls_established,
205            "tls_extensions": [],
206            "tls_version": self.tls_version,
207            # only used in sans-io
208            "state": self.state.value,
209            "sockname": self.sockname,
210            "error": self.error,
211            "tls": self.tls,
212            "certificate_list": [x.get_state() for x in self.certificate_list],
213            "alpn_offers": self.alpn_offers,
214            "cipher_list": self.cipher_list,
215        }
216
217    @classmethod
218    def from_state(cls, state) -> "Client":
219        client = Client(state["address"], ("mitmproxy", 8080), state["timestamp_start"])
220        client.set_state(state)
221        return client
222
223    def set_state(self, state):
224        self.peername = tuple(state["address"]) if state["address"] else None
225        self.alpn = state["alpn"]
226        self.cipher = state["cipher_name"]
227        self.id = state["id"]
228        self.sni = state["sni"]
229        self.timestamp_end = state["timestamp_end"]
230        self.timestamp_start = state["timestamp_start"]
231        self.timestamp_tls_setup = state["timestamp_tls_setup"]
232        self.tls_version = state["tls_version"]
233        # only used in sans-io
234        self.state = ConnectionState(state["state"])
235        self.sockname = tuple(state["sockname"]) if state["sockname"] else None
236        self.error = state["error"]
237        self.tls = state["tls"]
238        self.certificate_list = [
239            certs.Cert.from_state(x) for x in state["certificate_list"]
240        ]
241        self.mitmcert = (
242            certs.Cert.from_state(state["mitmcert"])
243            if state["mitmcert"] is not None
244            else None
245        )
246        self.alpn_offers = state["alpn_offers"]
247        self.cipher_list = state["cipher_list"]
248
249    @property
250    def address(self):  # pragma: no cover
251        """*Deprecated:* An outdated alias for Client.peername."""
252        warnings.warn(
253            "Client.address is deprecated, use Client.peername instead.",
254            DeprecationWarning,
255            stacklevel=2,
256        )
257        return self.peername
258
259    @address.setter
260    def address(self, x):  # pragma: no cover
261        warnings.warn(
262            "Client.address is deprecated, use Client.peername instead.",
263            DeprecationWarning,
264            stacklevel=2,
265        )
266        self.peername = x
267
268    @property
269    def cipher_name(self) -> Optional[str]:  # pragma: no cover
270        """*Deprecated:* An outdated alias for Connection.cipher."""
271        warnings.warn(
272            "Client.cipher_name is deprecated, use Client.cipher instead.",
273            DeprecationWarning,
274            stacklevel=2,
275        )
276        return self.cipher
277
278    @property
279    def clientcert(self) -> Optional[certs.Cert]:  # pragma: no cover
280        """*Deprecated:* An outdated alias for Connection.certificate_list[0]."""
281        warnings.warn(
282            "Client.clientcert is deprecated, use Client.certificate_list instead.",
283            DeprecationWarning,
284            stacklevel=2,
285        )
286        if self.certificate_list:
287            return self.certificate_list[0]
288        else:
289            return None
290
291    @clientcert.setter
292    def clientcert(self, val):  # pragma: no cover
293        warnings.warn(
294            "Client.clientcert is deprecated, use Client.certificate_list instead.",
295            DeprecationWarning,
296            stacklevel=2,
297        )
298        if val:
299            self.certificate_list = [val]
300        else:
301            self.certificate_list = []

A connection between a client and mitmproxy.

Client( peername: tuple[str, int], sockname: tuple[str, int], timestamp_start: float, *, transport_protocol: Literal['tcp', 'udp'] = 'tcp')
165    def __init__(
166        self,
167        peername: Address,
168        sockname: Address,
169        timestamp_start: float,
170        *,
171        transport_protocol: TransportProtocol = "tcp",
172    ):
173        self.id = str(uuid.uuid4())
174        self.peername = peername
175        self.sockname = sockname
176        self.timestamp_start = timestamp_start
177        self.state = ConnectionState.OPEN
178        self.transport_protocol = transport_protocol
peername: tuple[str, int]

The client's address.

sockname: tuple[str, int]

The local address we received this connection on.

mitmcert: Optional[mitmproxy.certs.Cert] = None

The certificate used by mitmproxy to establish TLS with the client.

timestamp_start: float

Timestamp: TCP SYN received

address

Deprecated: An outdated alias for Client.peername.

cipher_name: Optional[str]

Deprecated: An outdated alias for Connection.cipher.

clientcert: Optional[mitmproxy.certs.Cert]

Deprecated: An outdated alias for Connection.certificate_list[0].

class Server(Connection):
304class Server(Connection):
305    """A connection between mitmproxy and an upstream server."""
306
307    peername: Optional[Address] = None
308    """The server's resolved `(ip, port)` tuple. Will be set during connection establishment."""
309    sockname: Optional[Address] = None
310    address: Optional[Address]
311    """The server's `(host, port)` address tuple. The host can either be a domain or a plain IP address."""
312
313    timestamp_start: Optional[float] = None
314    """*Timestamp:* TCP SYN sent."""
315    timestamp_tcp_setup: Optional[float] = None
316    """*Timestamp:* TCP ACK received."""
317
318    via: Optional[server_spec.ServerSpec] = None
319    """An optional proxy server specification via which the connection should be established."""
320
321    def __init__(
322        self,
323        address: Optional[Address],
324        *,
325        transport_protocol: TransportProtocol = "tcp",
326    ):
327        self.id = str(uuid.uuid4())
328        self.address = address
329        self.state = ConnectionState.CLOSED
330        self.transport_protocol = transport_protocol
331
332    def __str__(self):
333        if self.alpn:
334            tls_state = f", alpn={self.alpn.decode(errors='replace')}"
335        elif self.tls_established:
336            tls_state = ", tls"
337        else:
338            tls_state = ""
339        if self.sockname:
340            local_port = f", src_port={self.sockname[1]}"
341        else:
342            local_port = ""
343        return f"Server({human.format_address(self.address)}, state={self.state.name.lower()}{tls_state}{local_port})"
344
345    def __setattr__(self, name, value):
346        if name in ("address", "via"):
347            connection_open = (
348                self.__dict__.get("state", ConnectionState.CLOSED)
349                is ConnectionState.OPEN
350            )
351            # assigning the current value is okay, that may be an artifact of calling .set_state().
352            attr_changed = self.__dict__.get(name) != value
353            if connection_open and attr_changed:
354                raise RuntimeError(f"Cannot change server.{name} on open connection.")
355        return super().__setattr__(name, value)
356
357    def get_state(self):
358        return {
359            "address": self.address,
360            "alpn": self.alpn,
361            "id": self.id,
362            "ip_address": self.peername,
363            "sni": self.sni,
364            "source_address": self.sockname,
365            "timestamp_end": self.timestamp_end,
366            "timestamp_start": self.timestamp_start,
367            "timestamp_tcp_setup": self.timestamp_tcp_setup,
368            "timestamp_tls_setup": self.timestamp_tls_setup,
369            "tls_established": self.tls_established,
370            "tls_version": self.tls_version,
371            "via": None,
372            # only used in sans-io
373            "state": self.state.value,
374            "error": self.error,
375            "tls": self.tls,
376            "certificate_list": [x.get_state() for x in self.certificate_list],
377            "alpn_offers": self.alpn_offers,
378            "cipher_name": self.cipher,
379            "cipher_list": self.cipher_list,
380            "via2": self.via,
381        }
382
383    @classmethod
384    def from_state(cls, state) -> "Server":
385        server = Server(None)
386        server.set_state(state)
387        return server
388
389    def set_state(self, state):
390        self.address = tuple(state["address"]) if state["address"] else None
391        self.alpn = state["alpn"]
392        self.id = state["id"]
393        self.peername = tuple(state["ip_address"]) if state["ip_address"] else None
394        self.sni = state["sni"]
395        self.sockname = (
396            tuple(state["source_address"]) if state["source_address"] else None
397        )
398        self.timestamp_end = state["timestamp_end"]
399        self.timestamp_start = state["timestamp_start"]
400        self.timestamp_tcp_setup = state["timestamp_tcp_setup"]
401        self.timestamp_tls_setup = state["timestamp_tls_setup"]
402        self.tls_version = state["tls_version"]
403        self.state = ConnectionState(state["state"])
404        self.error = state["error"]
405        self.tls = state["tls"]
406        self.certificate_list = [
407            certs.Cert.from_state(x) for x in state["certificate_list"]
408        ]
409        self.alpn_offers = state["alpn_offers"]
410        self.cipher = state["cipher_name"]
411        self.cipher_list = state["cipher_list"]
412        self.via = state["via2"]
413
414    @property
415    def ip_address(self) -> Optional[Address]:  # pragma: no cover
416        """*Deprecated:* An outdated alias for `Server.peername`."""
417        warnings.warn(
418            "Server.ip_address is deprecated, use Server.peername instead.",
419            DeprecationWarning,
420            stacklevel=2,
421        )
422        return self.peername
423
424    @property
425    def cert(self) -> Optional[certs.Cert]:  # pragma: no cover
426        """*Deprecated:* An outdated alias for `Connection.certificate_list[0]`."""
427        warnings.warn(
428            "Server.cert is deprecated, use Server.certificate_list instead.",
429            DeprecationWarning,
430            stacklevel=2,
431        )
432        if self.certificate_list:
433            return self.certificate_list[0]
434        else:
435            return None
436
437    @cert.setter
438    def cert(self, val):  # pragma: no cover
439        warnings.warn(
440            "Server.cert is deprecated, use Server.certificate_list instead.",
441            DeprecationWarning,
442            stacklevel=2,
443        )
444        if val:
445            self.certificate_list = [val]
446        else:
447            self.certificate_list = []

A connection between mitmproxy and an upstream server.

Server( address: Optional[tuple[str, int]], *, transport_protocol: Literal['tcp', 'udp'] = 'tcp')
321    def __init__(
322        self,
323        address: Optional[Address],
324        *,
325        transport_protocol: TransportProtocol = "tcp",
326    ):
327        self.id = str(uuid.uuid4())
328        self.address = address
329        self.state = ConnectionState.CLOSED
330        self.transport_protocol = transport_protocol
peername: Optional[tuple[str, int]] = None

The server's resolved (ip, port) tuple. Will be set during connection establishment.

sockname: Optional[tuple[str, int]] = None

Our local (ip, port) tuple for this connection.

address: Optional[tuple[str, int]]

The server's (host, port) address tuple. The host can either be a domain or a plain IP address.

timestamp_start: Optional[float] = None

Timestamp: TCP SYN sent.

timestamp_tcp_setup: Optional[float] = None

Timestamp: TCP ACK received.

via: Optional[mitmproxy.net.server_spec.ServerSpec] = None

An optional proxy server specification via which the connection should be established.

ip_address: Optional[tuple[str, int]]

Deprecated: An outdated alias for Server.peername.

cert: Optional[mitmproxy.certs.Cert]

Deprecated: An outdated alias for Connection.certificate_list[0].

class ConnectionState(enum.Flag):
15class ConnectionState(Flag):
16    """The current state of the underlying socket."""
17
18    CLOSED = 0
19    CAN_READ = 1
20    CAN_WRITE = 2
21    OPEN = CAN_READ | CAN_WRITE

The current state of the underlying socket.

CLOSED = <ConnectionState.CLOSED: 0>
CAN_READ = <ConnectionState.CAN_READ: 1>
CAN_WRITE = <ConnectionState.CAN_WRITE: 2>
OPEN = <ConnectionState.OPEN: 3>
Inherited Members
enum.Enum
name
value