Edit on GitHub

mitmproxy.tls

View Source
import io
from dataclasses import dataclass
from typing import List, Optional, Tuple

from kaitaistruct import KaitaiStream

from OpenSSL import SSL
from mitmproxy import connection
from mitmproxy.contrib.kaitaistruct import tls_client_hello
from mitmproxy.net import check
from mitmproxy.proxy import context


class ClientHello:
    """
    A TLS ClientHello is the first message sent by the client when initiating TLS.
    """

    _raw_bytes: bytes

    def __init__(self, raw_client_hello: bytes):
        """Create a TLS ClientHello object from raw bytes."""
        self._raw_bytes = raw_client_hello
        self._client_hello = tls_client_hello.TlsClientHello(
            KaitaiStream(io.BytesIO(raw_client_hello))
        )

    def raw_bytes(self, wrap_in_record: bool = True) -> bytes:
        """
        The raw ClientHello bytes as seen on the wire.

        If `wrap_in_record` is True, the ClientHello will be wrapped in a synthetic TLS record
        (`0x160303 + len(chm) + 0x01 + len(ch)`), which is the format expected by some tools.
        The synthetic record assumes TLS version (`0x0303`), which may be different from what has been sent over the
        wire. JA3 hashes are unaffected by this as they only use the TLS version from the ClientHello data structure.

        A future implementation may return not just the exact ClientHello, but also the exact record(s) as seen on the
        wire.
        """
        if wrap_in_record:
            return (
                # record layer
                b"\x16\x03\x03" + (len(self._raw_bytes) + 4).to_bytes(2, byteorder="big") +
                # handshake header
                b"\x01" + len(self._raw_bytes).to_bytes(3, byteorder="big") +
                # ClientHello as defined in https://datatracker.ietf.org/doc/html/rfc8446#section-4.1.2.
                self._raw_bytes
            )
        else:
            return self._raw_bytes

    @property
    def cipher_suites(self) -> List[int]:
        """The cipher suites offered by the client (as raw ints)."""
        return self._client_hello.cipher_suites.cipher_suites

    @property
    def sni(self) -> Optional[str]:
        """
        The [Server Name Indication](https://en.wikipedia.org/wiki/Server_Name_Indication),
        which indicates which hostname the client wants to connect to.
        """
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                is_valid_sni_extension = (
                    extension.type == 0x00 and
                    len(extension.body.server_names) == 1 and
                    extension.body.server_names[0].name_type == 0 and
                    check.is_valid_host(extension.body.server_names[0].host_name)
                )
                if is_valid_sni_extension:
                    return extension.body.server_names[0].host_name.decode("ascii")
        return None

    @property
    def alpn_protocols(self) -> List[bytes]:
        """
        The application layer protocols offered by the client as part of the
        [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation) TLS extension.
        """
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                if extension.type == 0x10:
                    return list(x.name for x in extension.body.alpn_protocols)
        return []

    @property
    def extensions(self) -> List[Tuple[int, bytes]]:
        """The raw list of extensions in the form of `(extension_type, raw_bytes)` tuples."""
        ret = []
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                body = getattr(extension, "_raw_body", extension.body)
                ret.append((extension.type, body))
        return ret

    def __repr__(self):
        return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"


@dataclass
class ClientHelloData:
    """
    Event data for `tls_clienthello` event hooks.
    """
    context: context.Context
    """The context object for this connection."""
    client_hello: ClientHello
    """The entire parsed TLS ClientHello."""
    ignore_connection: bool = False
    """
    If set to `True`, do not intercept this connection and forward encrypted contents unmodified.
    """
    establish_server_tls_first: bool = False
    """
    If set to `True`, pause this handshake and establish TLS with an upstream server first.
    This makes it possible to process the server certificate when generating an interception certificate.
    """


@dataclass
class TlsData:
    """
    Event data for `tls_start_client`, `tls_start_server`, and `tls_handshake` event hooks.
    """
    conn: connection.Connection
    """The affected connection."""
    context: context.Context
    """The context object for this connection."""
    ssl_conn: Optional[SSL.Connection] = None
    """
    The associated pyOpenSSL `SSL.Connection` object.
    This will be set by an addon in the `tls_start_*` event hooks.
    """
#   class ClientHello:
View Source
class ClientHello:
    """
    A TLS ClientHello is the first message sent by the client when initiating TLS.
    """

    _raw_bytes: bytes

    def __init__(self, raw_client_hello: bytes):
        """Create a TLS ClientHello object from raw bytes."""
        self._raw_bytes = raw_client_hello
        self._client_hello = tls_client_hello.TlsClientHello(
            KaitaiStream(io.BytesIO(raw_client_hello))
        )

    def raw_bytes(self, wrap_in_record: bool = True) -> bytes:
        """
        The raw ClientHello bytes as seen on the wire.

        If `wrap_in_record` is True, the ClientHello will be wrapped in a synthetic TLS record
        (`0x160303 + len(chm) + 0x01 + len(ch)`), which is the format expected by some tools.
        The synthetic record assumes TLS version (`0x0303`), which may be different from what has been sent over the
        wire. JA3 hashes are unaffected by this as they only use the TLS version from the ClientHello data structure.

        A future implementation may return not just the exact ClientHello, but also the exact record(s) as seen on the
        wire.
        """
        if wrap_in_record:
            return (
                # record layer
                b"\x16\x03\x03" + (len(self._raw_bytes) + 4).to_bytes(2, byteorder="big") +
                # handshake header
                b"\x01" + len(self._raw_bytes).to_bytes(3, byteorder="big") +
                # ClientHello as defined in https://datatracker.ietf.org/doc/html/rfc8446#section-4.1.2.
                self._raw_bytes
            )
        else:
            return self._raw_bytes

    @property
    def cipher_suites(self) -> List[int]:
        """The cipher suites offered by the client (as raw ints)."""
        return self._client_hello.cipher_suites.cipher_suites

    @property
    def sni(self) -> Optional[str]:
        """
        The [Server Name Indication](https://en.wikipedia.org/wiki/Server_Name_Indication),
        which indicates which hostname the client wants to connect to.
        """
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                is_valid_sni_extension = (
                    extension.type == 0x00 and
                    len(extension.body.server_names) == 1 and
                    extension.body.server_names[0].name_type == 0 and
                    check.is_valid_host(extension.body.server_names[0].host_name)
                )
                if is_valid_sni_extension:
                    return extension.body.server_names[0].host_name.decode("ascii")
        return None

    @property
    def alpn_protocols(self) -> List[bytes]:
        """
        The application layer protocols offered by the client as part of the
        [ALPN](https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation) TLS extension.
        """
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                if extension.type == 0x10:
                    return list(x.name for x in extension.body.alpn_protocols)
        return []

    @property
    def extensions(self) -> List[Tuple[int, bytes]]:
        """The raw list of extensions in the form of `(extension_type, raw_bytes)` tuples."""
        ret = []
        if self._client_hello.extensions:
            for extension in self._client_hello.extensions.extensions:
                body = getattr(extension, "_raw_body", extension.body)
                ret.append((extension.type, body))
        return ret

    def __repr__(self):
        return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"

A TLS ClientHello is the first message sent by the client when initiating TLS.

#   ClientHello(raw_client_hello: bytes)
View Source
    def __init__(self, raw_client_hello: bytes):
        """Create a TLS ClientHello object from raw bytes."""
        self._raw_bytes = raw_client_hello
        self._client_hello = tls_client_hello.TlsClientHello(
            KaitaiStream(io.BytesIO(raw_client_hello))
        )

Create a TLS ClientHello object from raw bytes.

#   def raw_bytes(self, wrap_in_record: bool = True) -> bytes:
View Source
    def raw_bytes(self, wrap_in_record: bool = True) -> bytes:
        """
        The raw ClientHello bytes as seen on the wire.

        If `wrap_in_record` is True, the ClientHello will be wrapped in a synthetic TLS record
        (`0x160303 + len(chm) + 0x01 + len(ch)`), which is the format expected by some tools.
        The synthetic record assumes TLS version (`0x0303`), which may be different from what has been sent over the
        wire. JA3 hashes are unaffected by this as they only use the TLS version from the ClientHello data structure.

        A future implementation may return not just the exact ClientHello, but also the exact record(s) as seen on the
        wire.
        """
        if wrap_in_record:
            return (
                # record layer
                b"\x16\x03\x03" + (len(self._raw_bytes) + 4).to_bytes(2, byteorder="big") +
                # handshake header
                b"\x01" + len(self._raw_bytes).to_bytes(3, byteorder="big") +
                # ClientHello as defined in https://datatracker.ietf.org/doc/html/rfc8446#section-4.1.2.
                self._raw_bytes
            )
        else:
            return self._raw_bytes

The raw ClientHello bytes as seen on the wire.

If wrap_in_record is True, the ClientHello will be wrapped in a synthetic TLS record (0x160303 + len(chm) + 0x01 + len(ch)), which is the format expected by some tools. The synthetic record assumes TLS version (0x0303), which may be different from what has been sent over the wire. JA3 hashes are unaffected by this as they only use the TLS version from the ClientHello data structure.

A future implementation may return not just the exact ClientHello, but also the exact record(s) as seen on the wire.

#   cipher_suites: List[int]

The cipher suites offered by the client (as raw ints).

#   sni: Optional[str]

The Server Name Indication, which indicates which hostname the client wants to connect to.

#   alpn_protocols: List[bytes]

The application layer protocols offered by the client as part of the ALPN TLS extension.

#   extensions: List[Tuple[int, bytes]]

The raw list of extensions in the form of (extension_type, raw_bytes) tuples.

#  
@dataclass
class ClientHelloData:
View Source
@dataclass
class ClientHelloData:
    """
    Event data for `tls_clienthello` event hooks.
    """
    context: context.Context
    """The context object for this connection."""
    client_hello: ClientHello
    """The entire parsed TLS ClientHello."""
    ignore_connection: bool = False
    """
    If set to `True`, do not intercept this connection and forward encrypted contents unmodified.
    """
    establish_server_tls_first: bool = False
    """
    If set to `True`, pause this handshake and establish TLS with an upstream server first.
    This makes it possible to process the server certificate when generating an interception certificate.
    """

Event data for tls_clienthello event hooks.

The context object for this connection.

The entire parsed TLS ClientHello.

#   ignore_connection: bool = False

If set to True, do not intercept this connection and forward encrypted contents unmodified.

#   establish_server_tls_first: bool = False

If set to True, pause this handshake and establish TLS with an upstream server first. This makes it possible to process the server certificate when generating an interception certificate.

#  
@dataclass
class TlsData:
View Source
@dataclass
class TlsData:
    """
    Event data for `tls_start_client`, `tls_start_server`, and `tls_handshake` event hooks.
    """
    conn: connection.Connection
    """The affected connection."""
    context: context.Context
    """The context object for this connection."""
    ssl_conn: Optional[SSL.Connection] = None
    """
    The associated pyOpenSSL `SSL.Connection` object.
    This will be set by an addon in the `tls_start_*` event hooks.
    """

Event data for tls_start_client, tls_start_server, and tls_handshake event hooks.

The affected connection.

The context object for this connection.

#   ssl_conn: Optional[OpenSSL.SSL.Connection] = None

The associated pyOpenSSL SSL.Connection object. This will be set by an addon in the tls_start_* event hooks.