Edit on GitHub

mitmproxy.certs

  1import contextlib
  2import datetime
  3import ipaddress
  4import logging
  5import os
  6import sys
  7import warnings
  8from collections.abc import Iterable
  9from dataclasses import dataclass
 10from pathlib import Path
 11from typing import cast
 12from typing import NewType
 13from typing import Optional
 14from typing import Union
 15
 16import OpenSSL
 17from cryptography import x509
 18from cryptography.hazmat.primitives import hashes
 19from cryptography.hazmat.primitives import serialization
 20from cryptography.hazmat.primitives.asymmetric import dsa
 21from cryptography.hazmat.primitives.asymmetric import ec
 22from cryptography.hazmat.primitives.asymmetric import rsa
 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes
 24from cryptography.hazmat.primitives.serialization import pkcs12
 25from cryptography.x509 import ExtendedKeyUsageOID
 26from cryptography.x509 import NameOID
 27
 28from mitmproxy.coretypes import serializable
 29
 30if sys.version_info < (3, 13):  # pragma: no cover
 31    from typing_extensions import deprecated
 32else:
 33    from warnings import deprecated
 34
 35logger = logging.getLogger(__name__)
 36
 37# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815
 38# Note that expiry will be offset by `CERT_VALIDITY_OFFSET`, i.e. the cert will be
 39# backdated a bit to account for clients with incorrect clocks.
 40CA_EXPIRY = datetime.timedelta(days=10 * 365)
 41CERT_EXPIRY = datetime.timedelta(days=199)
 42CRL_EXPIRY = datetime.timedelta(days=7)
 43
 44CERT_VALIDITY_OFFSET = datetime.timedelta(days=-2)
 45
 46# Generated with "openssl dhparam". It's too slow to generate this on startup.
 47DEFAULT_DHPARAM = b"""
 48-----BEGIN DH PARAMETERS-----
 49MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3
 50O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv
 51j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ
 52Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB
 53chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC
 54ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq
 55o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX
 56IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv
 57A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8
 586mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I
 59rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
 60-----END DH PARAMETERS-----
 61"""
 62
 63
 64class Cert(serializable.Serializable):
 65    """Representation of a (TLS) certificate."""
 66
 67    _cert: x509.Certificate
 68
 69    def __init__(self, cert: x509.Certificate):
 70        assert isinstance(cert, x509.Certificate)
 71        self._cert = cert
 72
 73    def __eq__(self, other):
 74        return self.fingerprint() == other.fingerprint()
 75
 76    def __repr__(self):
 77        altnames = [str(x.value) for x in self.altnames]
 78        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 79
 80    def __hash__(self):
 81        return self._cert.__hash__()
 82
 83    @classmethod
 84    def from_state(cls, state):
 85        return cls.from_pem(state)
 86
 87    def get_state(self):
 88        return self.to_pem()
 89
 90    def set_state(self, state):
 91        self._cert = x509.load_pem_x509_certificate(state)
 92
 93    @classmethod
 94    def from_pem(cls, data: bytes) -> "Cert":
 95        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 96        return cls(cert)
 97
 98    def to_pem(self) -> bytes:
 99        return self._cert.public_bytes(serialization.Encoding.PEM)
100
101    @classmethod
102    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
103        return Cert(x509.to_cryptography())
104
105    @deprecated("Use `to_cryptography` instead.")
106    def to_pyopenssl(self) -> OpenSSL.crypto.X509:  # pragma: no cover
107        return OpenSSL.crypto.X509.from_cryptography(self._cert)
108
109    def to_cryptography(self) -> x509.Certificate:
110        return self._cert
111
112    def public_key(self) -> CertificatePublicKeyTypes:
113        return self._cert.public_key()
114
115    def fingerprint(self) -> bytes:
116        return self._cert.fingerprint(hashes.SHA256())
117
118    @property
119    def issuer(self) -> list[tuple[str, str]]:
120        return _name_to_keyval(self._cert.issuer)
121
122    @property
123    def notbefore(self) -> datetime.datetime:
124        try:
125            # type definitions haven't caught up with new API yet.
126            return self._cert.not_valid_before_utc  # type: ignore
127        except AttributeError:  # pragma: no cover
128            # cryptography < 42.0
129            return self._cert.not_valid_before.replace(tzinfo=datetime.UTC)
130
131    @property
132    def notafter(self) -> datetime.datetime:
133        try:
134            return self._cert.not_valid_after_utc  # type: ignore
135        except AttributeError:  # pragma: no cover
136            return self._cert.not_valid_after.replace(tzinfo=datetime.UTC)
137
138    def has_expired(self) -> bool:
139        if sys.version_info < (3, 11):  # pragma: no cover
140            return datetime.datetime.now(datetime.UTC) > self.notafter
141        return datetime.datetime.now(datetime.UTC) > self.notafter
142
143    @property
144    def subject(self) -> list[tuple[str, str]]:
145        return _name_to_keyval(self._cert.subject)
146
147    @property
148    def serial(self) -> int:
149        return self._cert.serial_number
150
151    @property
152    def is_ca(self) -> bool:
153        constraints: x509.BasicConstraints
154        try:
155            constraints = self._cert.extensions.get_extension_for_class(
156                x509.BasicConstraints
157            ).value
158            return constraints.ca
159        except x509.ExtensionNotFound:
160            return False
161
162    @property
163    def keyinfo(self) -> tuple[str, int]:
164        public_key = self._cert.public_key()
165        if isinstance(public_key, rsa.RSAPublicKey):
166            return "RSA", public_key.key_size
167        if isinstance(public_key, dsa.DSAPublicKey):
168            return "DSA", public_key.key_size
169        if isinstance(public_key, ec.EllipticCurvePublicKey):
170            return f"EC ({public_key.curve.name})", public_key.key_size
171        return (
172            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
173            getattr(public_key, "key_size", -1),
174        )  # pragma: no cover
175
176    @property
177    def cn(self) -> str | None:
178        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
179        if attrs:
180            return cast(str, attrs[0].value)
181        return None
182
183    @property
184    def organization(self) -> str | None:
185        attrs = self._cert.subject.get_attributes_for_oid(
186            x509.NameOID.ORGANIZATION_NAME
187        )
188        if attrs:
189            return cast(str, attrs[0].value)
190        return None
191
192    @property
193    def altnames(self) -> x509.GeneralNames:
194        """
195        Get all SubjectAlternativeName DNS altnames.
196        """
197        try:
198            sans = self._cert.extensions.get_extension_for_class(
199                x509.SubjectAlternativeName
200            ).value
201        except x509.ExtensionNotFound:
202            return x509.GeneralNames([])
203        else:
204            return x509.GeneralNames(sans)
205
206    @property
207    def crl_distribution_points(self) -> list[str]:
208        try:
209            ext = self._cert.extensions.get_extension_for_class(
210                x509.CRLDistributionPoints
211            ).value
212        except x509.ExtensionNotFound:
213            return []
214        else:
215            return [
216                dist_point.full_name[0].value
217                for dist_point in ext
218                if dist_point.full_name
219                and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier)
220            ]
221
222
223def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
224    parts = []
225    for attr in name:
226        k = attr.rfc4514_string().partition("=")[0]
227        v = cast(str, attr.value)
228        parts.append((k, v))
229    return parts
230
231
232def create_ca(
233    organization: str,
234    cn: str,
235    key_size: int,
236) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
237    now = datetime.datetime.now()
238
239    private_key = rsa.generate_private_key(
240        public_exponent=65537,
241        key_size=key_size,
242    )  # type: ignore
243    name = x509.Name(
244        [
245            x509.NameAttribute(NameOID.COMMON_NAME, cn),
246            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
247        ]
248    )
249    builder = x509.CertificateBuilder()
250    builder = builder.serial_number(x509.random_serial_number())
251    builder = builder.subject_name(name)
252    builder = builder.not_valid_before(now + CERT_VALIDITY_OFFSET)
253    builder = builder.not_valid_after(now + CERT_VALIDITY_OFFSET + CA_EXPIRY)
254    builder = builder.issuer_name(name)
255    builder = builder.public_key(private_key.public_key())
256    builder = builder.add_extension(
257        x509.BasicConstraints(ca=True, path_length=None), critical=True
258    )
259    builder = builder.add_extension(
260        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
261    )
262    builder = builder.add_extension(
263        x509.KeyUsage(
264            digital_signature=False,
265            content_commitment=False,
266            key_encipherment=False,
267            data_encipherment=False,
268            key_agreement=False,
269            key_cert_sign=True,
270            crl_sign=True,
271            encipher_only=False,
272            decipher_only=False,
273        ),
274        critical=True,
275    )
276    builder = builder.add_extension(
277        x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
278        critical=False,
279    )
280    cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256())  # type: ignore
281    return private_key, cert
282
283
284def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
285    """
286    SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
287    This function converts the old format to the new one.
288    """
289    if isinstance(sans, x509.GeneralNames):
290        return sans
291    elif (
292        isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
293    ):  # pragma: no cover
294        warnings.warn(
295            "Passing SANs as a list of strings is deprecated.",
296            DeprecationWarning,
297            stacklevel=2,
298        )
299
300        ss: list[x509.GeneralName] = []
301        for x in cast(list[str], sans):
302            try:
303                ip = ipaddress.ip_address(x)
304            except ValueError:
305                x = x.encode("idna").decode()
306                ss.append(x509.DNSName(x))
307            else:
308                ss.append(x509.IPAddress(ip))
309        return x509.GeneralNames(ss)
310    else:
311        return x509.GeneralNames(cast(Iterable[x509.GeneralName], sans))
312
313
314def dummy_cert(
315    privkey: rsa.RSAPrivateKey,
316    cacert: x509.Certificate,
317    commonname: str | None,
318    sans: Iterable[x509.GeneralName],
319    organization: str | None = None,
320    crl_url: str | None = None,
321) -> Cert:
322    """
323    Generates a dummy certificate.
324
325    privkey: CA private key
326    cacert: CA certificate
327    commonname: Common name for the generated certificate.
328    sans: A list of Subject Alternate Names.
329    organization: Organization name for the generated certificate.
330    crl_url: URL of CRL distribution point
331
332    Returns cert if operation succeeded, None if not.
333    """
334    builder = x509.CertificateBuilder()
335    builder = builder.issuer_name(cacert.subject)
336    builder = builder.add_extension(
337        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
338    )
339    builder = builder.public_key(cacert.public_key())
340
341    now = datetime.datetime.now()
342    builder = builder.not_valid_before(now + CERT_VALIDITY_OFFSET)
343    builder = builder.not_valid_after(now + CERT_VALIDITY_OFFSET + CERT_EXPIRY)
344
345    subject = []
346    is_valid_commonname = commonname is not None and len(commonname) < 64
347    if is_valid_commonname:
348        assert commonname is not None
349        subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname))
350    if organization is not None:
351        assert organization is not None
352        subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))
353    builder = builder.subject_name(x509.Name(subject))
354    builder = builder.serial_number(x509.random_serial_number())
355
356    # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
357    builder = builder.add_extension(
358        x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
359        critical=not is_valid_commonname,
360    )
361
362    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
363    # Per RFC 5280 §4.2.1.2, the AKI's keyIdentifier in a child certificate
364    # MUST be byte-equal to the issuer's stored SubjectKeyIdentifier. If we
365    # recompute it from the public key with `from_issuer_public_key()`, we
366    # always derive a SHA-1 digest, which mismatches whenever the issuer's
367    # SKI was generated by a different method (RFC 7093 truncated
368    # SHA-256/384/512, hardware-rooted CAs, or any custom value). This
369    # mismatch is rejected by strict TLS chain builders (`X509_V_FLAG_X509_STRICT`,
370    # Python `ssl`, Go `crypto/x509`) with "authority and subject key
371    # identifier mismatch". Most notably, cert-manager >=1.18 / Go >=1.25
372    # default to truncated SHA-256 SKIs for FIPS 140-3 compliance.
373    try:
374        issuer_ski = cacert.extensions.get_extension_for_class(
375            x509.SubjectKeyIdentifier
376        ).value
377        aki = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(issuer_ski)
378    except x509.ExtensionNotFound:
379        # Fall back when the issuer cert has no SKI extension at all (legacy
380        # CAs predating RFC 5280 conformance).
381        aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key())  # type: ignore
382    builder = builder.add_extension(aki, critical=False)
383    # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways,
384    # see https://github.com/mitmproxy/mitmproxy/issues/6494.
385    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states
386    # that SKI is optional for the leaf cert, so we skip that.
387
388    if crl_url:
389        builder = builder.add_extension(
390            x509.CRLDistributionPoints(
391                [
392                    x509.DistributionPoint(
393                        [x509.UniformResourceIdentifier(crl_url)],
394                        relative_name=None,
395                        crl_issuer=None,
396                        reasons=None,
397                    )
398                ]
399            ),
400            critical=False,
401        )
402
403    cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256())  # type: ignore
404    return Cert(cert)
405
406
407def dummy_crl(
408    privkey: rsa.RSAPrivateKey,
409    cacert: x509.Certificate,
410) -> bytes:
411    """
412    Generates an empty CRL signed with the CA key
413
414    privkey: CA private key
415    cacert: CA certificate
416
417    Returns a CRL DER encoded
418    """
419    builder = x509.CertificateRevocationListBuilder()
420    builder = builder.issuer_name(cacert.issuer)
421
422    now = datetime.datetime.now()
423    builder = builder.last_update(now + CERT_VALIDITY_OFFSET)
424    builder = builder.next_update(now + CERT_VALIDITY_OFFSET + CRL_EXPIRY)
425
426    builder = builder.add_extension(x509.CRLNumber(1000), False)  # meaningless number
427    crl = builder.sign(private_key=privkey, algorithm=hashes.SHA256())
428    return crl.public_bytes(serialization.Encoding.DER)
429
430
431@dataclass(frozen=True)
432class CertStoreEntry:
433    cert: Cert
434    privatekey: rsa.RSAPrivateKey
435    chain_file: Path | None
436    chain_certs: list[Cert]
437
438
439TCustomCertId = str  # manually provided certs (e.g. mitmproxy's --certs)
440TGeneratedCertId = tuple[Optional[str], x509.GeneralNames]  # (common_name, sans)
441TCertId = Union[TCustomCertId, TGeneratedCertId]
442
443DHParams = NewType("DHParams", bytes)
444
445
446class CertStore:
447    """
448    Implements an in-memory certificate store.
449    """
450
451    STORE_CAP = 100
452    default_privatekey: rsa.RSAPrivateKey
453    default_ca: Cert
454    default_chain_file: Path | None
455    default_chain_certs: list[Cert]
456    dhparams: DHParams
457    certs: dict[TCertId, CertStoreEntry]
458    expire_queue: list[CertStoreEntry]
459
460    def __init__(
461        self,
462        default_privatekey: rsa.RSAPrivateKey,
463        default_ca: Cert,
464        default_chain_file: Path | None,
465        default_crl: bytes,
466        dhparams: DHParams,
467    ):
468        self.default_privatekey = default_privatekey
469        self.default_ca = default_ca
470        self.default_chain_file = default_chain_file
471        self.default_crl = default_crl
472        self.default_chain_certs = (
473            [
474                Cert(c)
475                for c in x509.load_pem_x509_certificates(
476                    self.default_chain_file.read_bytes()
477                )
478            ]
479            if self.default_chain_file
480            else [default_ca]
481        )
482        self.dhparams = dhparams
483        self.certs = {}
484        self.expire_queue = []
485
486    def expire(self, entry: CertStoreEntry) -> None:
487        self.expire_queue.append(entry)
488        if len(self.expire_queue) > self.STORE_CAP:
489            d = self.expire_queue.pop(0)
490            self.certs = {k: v for k, v in self.certs.items() if v != d}
491
492    @staticmethod
493    def load_dhparam(path: Path) -> DHParams:
494        # mitmproxy<=0.10 doesn't generate a dhparam file.
495        # Create it now if necessary.
496        if not path.exists():
497            path.write_bytes(DEFAULT_DHPARAM)
498
499        # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's
500        # expected format.
501        bio = OpenSSL.SSL._lib.BIO_new_file(  # type: ignore
502            str(path).encode(sys.getfilesystemencoding()), b"r"
503        )
504        if bio != OpenSSL.SSL._ffi.NULL:  # type: ignore
505            bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free)  # type: ignore
506            dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams(  # type: ignore
507                bio,
508                OpenSSL.SSL._ffi.NULL,  # type: ignore
509                OpenSSL.SSL._ffi.NULL,  # type: ignore
510                OpenSSL.SSL._ffi.NULL,  # type: ignore
511            )
512            dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free)  # type: ignore
513            return dh
514        raise RuntimeError("Error loading DH Params.")  # pragma: no cover
515
516    @classmethod
517    def from_store(
518        cls,
519        path: Path | str,
520        basename: str,
521        key_size: int,
522        passphrase: bytes | None = None,
523    ) -> "CertStore":
524        path = Path(path)
525        ca_file = path / f"{basename}-ca.pem"
526        dhparam_file = path / f"{basename}-dhparam.pem"
527        if not ca_file.exists():
528            cls.create_store(path, basename, key_size)
529        return cls.from_files(ca_file, dhparam_file, passphrase)
530
531    @classmethod
532    def from_files(
533        cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None
534    ) -> "CertStore":
535        raw = ca_file.read_bytes()
536        key = load_pem_private_key(raw, passphrase)
537        dh = cls.load_dhparam(dhparam_file)
538        certs = x509.load_pem_x509_certificates(raw)
539        ca = Cert(certs[0])
540        crl = dummy_crl(key, ca._cert)
541        if len(certs) > 1:
542            chain_file: Path | None = ca_file
543        else:
544            chain_file = None
545        return cls(key, ca, chain_file, crl, dh)
546
547    @staticmethod
548    @contextlib.contextmanager
549    def umask_secret():
550        """
551        Context to temporarily set umask to its original value bitor 0o77.
552        Useful when writing private keys to disk so that only the owner
553        will be able to read them.
554        """
555        original_umask = os.umask(0)
556        os.umask(original_umask | 0o77)
557        try:
558            yield
559        finally:
560            os.umask(original_umask)
561
562    @staticmethod
563    def create_store(
564        path: Path, basename: str, key_size: int, organization=None, cn=None
565    ) -> None:
566        path.mkdir(parents=True, exist_ok=True)
567
568        organization = organization or basename
569        cn = cn or basename
570
571        key: rsa.RSAPrivateKeyWithSerialization
572        ca: x509.Certificate
573        key, ca = create_ca(organization=organization, cn=cn, key_size=key_size)
574
575        # Dump the CA plus private key.
576        with CertStore.umask_secret():
577            # PEM format
578            (path / f"{basename}-ca.pem").write_bytes(
579                key.private_bytes(
580                    encoding=serialization.Encoding.PEM,
581                    format=serialization.PrivateFormat.TraditionalOpenSSL,
582                    encryption_algorithm=serialization.NoEncryption(),
583                )
584                + ca.public_bytes(serialization.Encoding.PEM)
585            )
586
587            # PKCS12 format for Windows devices
588            (path / f"{basename}-ca.p12").write_bytes(
589                pkcs12.serialize_key_and_certificates(  # type: ignore
590                    name=basename.encode(),
591                    key=key,
592                    cert=ca,
593                    cas=None,
594                    encryption_algorithm=serialization.NoEncryption(),
595                )
596            )
597
598        # Dump the certificate in PEM format
599        pem_cert = ca.public_bytes(serialization.Encoding.PEM)
600        (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert)
601        # Create a .cer file with the same contents for Android
602        (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert)
603
604        # Dump the certificate in PKCS12 format for Windows devices
605        (path / f"{basename}-ca-cert.p12").write_bytes(
606            pkcs12.serialize_key_and_certificates(
607                name=basename.encode(),
608                key=None,  # type: ignore
609                cert=ca,
610                cas=None,
611                encryption_algorithm=serialization.NoEncryption(),
612            )
613        )
614
615        (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM)
616
617    def add_cert_file(
618        self, spec: str, path: Path, passphrase: bytes | None = None
619    ) -> None:
620        raw = path.read_bytes()
621        cert = Cert.from_pem(raw)
622        try:
623            private_key = load_pem_private_key(raw, password=passphrase)
624        except ValueError as e:
625            private_key = self.default_privatekey
626            if cert.public_key() != private_key.public_key():
627                raise ValueError(
628                    f'Unable to find private key in "{path.absolute()}": {e}'
629                ) from e
630        else:
631            if cert.public_key() != private_key.public_key():
632                raise ValueError(
633                    f'Private and public keys in "{path.absolute()}" do not match:\n'
634                    f"{cert.public_key()=}\n"
635                    f"{private_key.public_key()=}"
636                )
637
638        try:
639            chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)]
640        except ValueError as e:
641            logger.warning(f"Failed to read certificate chain: {e}")
642            chain = [cert]
643
644        if cert.is_ca:
645            logger.warning(
646                f'"{path.absolute()}" is a certificate authority and not a leaf certificate. '
647                f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/."
648            )
649
650        self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec)
651
652    def add_cert(self, entry: CertStoreEntry, *names: str) -> None:
653        """
654        Adds a cert to the certstore. We register the CN in the cert plus
655        any SANs, and also the list of names provided as an argument.
656        """
657        if entry.cert.cn:
658            self.certs[entry.cert.cn] = entry
659        for i in entry.cert.altnames:
660            self.certs[str(i.value)] = entry
661        for i in names:
662            self.certs[i] = entry
663
664    @staticmethod
665    def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
666        """
667        Return all asterisk forms for a domain. For example, for www.example.com this will return
668        [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
669        """
670        if isinstance(dn, str):
671            parts = dn.split(".")
672            ret = [dn]
673            for i in range(1, len(parts)):
674                ret.append("*." + ".".join(parts[i:]))
675            return ret
676        elif isinstance(dn, x509.DNSName):
677            return CertStore.asterisk_forms(dn.value)
678        else:
679            return [str(dn.value)]
680
681    def get_cert(
682        self,
683        commonname: str | None,
684        sans: Iterable[x509.GeneralName],
685        organization: str | None = None,
686        crl_url: str | None = None,
687    ) -> CertStoreEntry:
688        """
689        commonname: Common name for the generated certificate. Must be a
690        valid, plain-ASCII, IDNA-encoded domain name.
691
692        sans: A list of Subject Alternate Names.
693
694        organization: Organization name for the generated certificate.
695
696        crl_url: URL of CRL distribution point
697        """
698        sans = _fix_legacy_sans(sans)
699
700        potential_keys: list[TCertId] = []
701        if commonname:
702            potential_keys.extend(self.asterisk_forms(commonname))
703        for s in sans:
704            potential_keys.extend(self.asterisk_forms(s))
705        potential_keys.append("*")
706        potential_keys.append((commonname, sans))
707
708        name = next(filter(lambda key: key in self.certs, potential_keys), None)
709        if name:
710            entry = self.certs[name]
711        else:
712            entry = CertStoreEntry(
713                cert=dummy_cert(
714                    self.default_privatekey,
715                    self.default_ca._cert,
716                    commonname,
717                    sans,
718                    organization,
719                    crl_url,
720                ),
721                privatekey=self.default_privatekey,
722                chain_file=self.default_chain_file,
723                chain_certs=self.default_chain_certs,
724            )
725            self.certs[(commonname, sans)] = entry
726            self.expire(entry)
727
728        return entry
729
730
731def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey:
732    """
733    like cryptography's load_pem_private_key, but silently falls back to not using a password
734    if the private key is unencrypted.
735    """
736    try:
737        return serialization.load_pem_private_key(data, password)  # type: ignore
738    except TypeError:
739        if password is not None:
740            return load_pem_private_key(data, None)
741        raise
class Cert(mitmproxy.coretypes.serializable.Serializable):
 65class Cert(serializable.Serializable):
 66    """Representation of a (TLS) certificate."""
 67
 68    _cert: x509.Certificate
 69
 70    def __init__(self, cert: x509.Certificate):
 71        assert isinstance(cert, x509.Certificate)
 72        self._cert = cert
 73
 74    def __eq__(self, other):
 75        return self.fingerprint() == other.fingerprint()
 76
 77    def __repr__(self):
 78        altnames = [str(x.value) for x in self.altnames]
 79        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 80
 81    def __hash__(self):
 82        return self._cert.__hash__()
 83
 84    @classmethod
 85    def from_state(cls, state):
 86        return cls.from_pem(state)
 87
 88    def get_state(self):
 89        return self.to_pem()
 90
 91    def set_state(self, state):
 92        self._cert = x509.load_pem_x509_certificate(state)
 93
 94    @classmethod
 95    def from_pem(cls, data: bytes) -> "Cert":
 96        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 97        return cls(cert)
 98
 99    def to_pem(self) -> bytes:
100        return self._cert.public_bytes(serialization.Encoding.PEM)
101
102    @classmethod
103    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
104        return Cert(x509.to_cryptography())
105
106    @deprecated("Use `to_cryptography` instead.")
107    def to_pyopenssl(self) -> OpenSSL.crypto.X509:  # pragma: no cover
108        return OpenSSL.crypto.X509.from_cryptography(self._cert)
109
110    def to_cryptography(self) -> x509.Certificate:
111        return self._cert
112
113    def public_key(self) -> CertificatePublicKeyTypes:
114        return self._cert.public_key()
115
116    def fingerprint(self) -> bytes:
117        return self._cert.fingerprint(hashes.SHA256())
118
119    @property
120    def issuer(self) -> list[tuple[str, str]]:
121        return _name_to_keyval(self._cert.issuer)
122
123    @property
124    def notbefore(self) -> datetime.datetime:
125        try:
126            # type definitions haven't caught up with new API yet.
127            return self._cert.not_valid_before_utc  # type: ignore
128        except AttributeError:  # pragma: no cover
129            # cryptography < 42.0
130            return self._cert.not_valid_before.replace(tzinfo=datetime.UTC)
131
132    @property
133    def notafter(self) -> datetime.datetime:
134        try:
135            return self._cert.not_valid_after_utc  # type: ignore
136        except AttributeError:  # pragma: no cover
137            return self._cert.not_valid_after.replace(tzinfo=datetime.UTC)
138
139    def has_expired(self) -> bool:
140        if sys.version_info < (3, 11):  # pragma: no cover
141            return datetime.datetime.now(datetime.UTC) > self.notafter
142        return datetime.datetime.now(datetime.UTC) > self.notafter
143
144    @property
145    def subject(self) -> list[tuple[str, str]]:
146        return _name_to_keyval(self._cert.subject)
147
148    @property
149    def serial(self) -> int:
150        return self._cert.serial_number
151
152    @property
153    def is_ca(self) -> bool:
154        constraints: x509.BasicConstraints
155        try:
156            constraints = self._cert.extensions.get_extension_for_class(
157                x509.BasicConstraints
158            ).value
159            return constraints.ca
160        except x509.ExtensionNotFound:
161            return False
162
163    @property
164    def keyinfo(self) -> tuple[str, int]:
165        public_key = self._cert.public_key()
166        if isinstance(public_key, rsa.RSAPublicKey):
167            return "RSA", public_key.key_size
168        if isinstance(public_key, dsa.DSAPublicKey):
169            return "DSA", public_key.key_size
170        if isinstance(public_key, ec.EllipticCurvePublicKey):
171            return f"EC ({public_key.curve.name})", public_key.key_size
172        return (
173            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
174            getattr(public_key, "key_size", -1),
175        )  # pragma: no cover
176
177    @property
178    def cn(self) -> str | None:
179        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
180        if attrs:
181            return cast(str, attrs[0].value)
182        return None
183
184    @property
185    def organization(self) -> str | None:
186        attrs = self._cert.subject.get_attributes_for_oid(
187            x509.NameOID.ORGANIZATION_NAME
188        )
189        if attrs:
190            return cast(str, attrs[0].value)
191        return None
192
193    @property
194    def altnames(self) -> x509.GeneralNames:
195        """
196        Get all SubjectAlternativeName DNS altnames.
197        """
198        try:
199            sans = self._cert.extensions.get_extension_for_class(
200                x509.SubjectAlternativeName
201            ).value
202        except x509.ExtensionNotFound:
203            return x509.GeneralNames([])
204        else:
205            return x509.GeneralNames(sans)
206
207    @property
208    def crl_distribution_points(self) -> list[str]:
209        try:
210            ext = self._cert.extensions.get_extension_for_class(
211                x509.CRLDistributionPoints
212            ).value
213        except x509.ExtensionNotFound:
214            return []
215        else:
216            return [
217                dist_point.full_name[0].value
218                for dist_point in ext
219                if dist_point.full_name
220                and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier)
221            ]

Representation of a (TLS) certificate.

Cert(cert: cryptography.hazmat.bindings._rust.x509.Certificate)
70    def __init__(self, cert: x509.Certificate):
71        assert isinstance(cert, x509.Certificate)
72        self._cert = cert
@classmethod
def from_pem(cls, data: bytes) -> Cert:
94    @classmethod
95    def from_pem(cls, data: bytes) -> "Cert":
96        cert = x509.load_pem_x509_certificate(data)  # type: ignore
97        return cls(cert)
def to_pem(self) -> bytes:
 99    def to_pem(self) -> bytes:
100        return self._cert.public_bytes(serialization.Encoding.PEM)
@classmethod
def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> Cert:
102    @classmethod
103    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
104        return Cert(x509.to_cryptography())
@deprecated('Use `to_cryptography` instead.')
def to_pyopenssl(self) -> OpenSSL.crypto.X509:
106    @deprecated("Use `to_cryptography` instead.")
107    def to_pyopenssl(self) -> OpenSSL.crypto.X509:  # pragma: no cover
108        return OpenSSL.crypto.X509.from_cryptography(self._cert)
def to_cryptography(self) -> cryptography.hazmat.bindings._rust.x509.Certificate:
110    def to_cryptography(self) -> x509.Certificate:
111        return self._cert
def public_key( self) -> cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey | cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey:
113    def public_key(self) -> CertificatePublicKeyTypes:
114        return self._cert.public_key()
def fingerprint(self) -> bytes:
116    def fingerprint(self) -> bytes:
117        return self._cert.fingerprint(hashes.SHA256())
issuer: list[tuple[str, str]]
119    @property
120    def issuer(self) -> list[tuple[str, str]]:
121        return _name_to_keyval(self._cert.issuer)
notbefore: datetime.datetime
123    @property
124    def notbefore(self) -> datetime.datetime:
125        try:
126            # type definitions haven't caught up with new API yet.
127            return self._cert.not_valid_before_utc  # type: ignore
128        except AttributeError:  # pragma: no cover
129            # cryptography < 42.0
130            return self._cert.not_valid_before.replace(tzinfo=datetime.UTC)
notafter: datetime.datetime
132    @property
133    def notafter(self) -> datetime.datetime:
134        try:
135            return self._cert.not_valid_after_utc  # type: ignore
136        except AttributeError:  # pragma: no cover
137            return self._cert.not_valid_after.replace(tzinfo=datetime.UTC)
def has_expired(self) -> bool:
139    def has_expired(self) -> bool:
140        if sys.version_info < (3, 11):  # pragma: no cover
141            return datetime.datetime.now(datetime.UTC) > self.notafter
142        return datetime.datetime.now(datetime.UTC) > self.notafter
subject: list[tuple[str, str]]
144    @property
145    def subject(self) -> list[tuple[str, str]]:
146        return _name_to_keyval(self._cert.subject)
serial: int
148    @property
149    def serial(self) -> int:
150        return self._cert.serial_number
is_ca: bool
152    @property
153    def is_ca(self) -> bool:
154        constraints: x509.BasicConstraints
155        try:
156            constraints = self._cert.extensions.get_extension_for_class(
157                x509.BasicConstraints
158            ).value
159            return constraints.ca
160        except x509.ExtensionNotFound:
161            return False
keyinfo: tuple[str, int]
163    @property
164    def keyinfo(self) -> tuple[str, int]:
165        public_key = self._cert.public_key()
166        if isinstance(public_key, rsa.RSAPublicKey):
167            return "RSA", public_key.key_size
168        if isinstance(public_key, dsa.DSAPublicKey):
169            return "DSA", public_key.key_size
170        if isinstance(public_key, ec.EllipticCurvePublicKey):
171            return f"EC ({public_key.curve.name})", public_key.key_size
172        return (
173            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
174            getattr(public_key, "key_size", -1),
175        )  # pragma: no cover
cn: str | None
177    @property
178    def cn(self) -> str | None:
179        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
180        if attrs:
181            return cast(str, attrs[0].value)
182        return None
organization: str | None
184    @property
185    def organization(self) -> str | None:
186        attrs = self._cert.subject.get_attributes_for_oid(
187            x509.NameOID.ORGANIZATION_NAME
188        )
189        if attrs:
190            return cast(str, attrs[0].value)
191        return None
altnames: cryptography.x509.extensions.GeneralNames
193    @property
194    def altnames(self) -> x509.GeneralNames:
195        """
196        Get all SubjectAlternativeName DNS altnames.
197        """
198        try:
199            sans = self._cert.extensions.get_extension_for_class(
200                x509.SubjectAlternativeName
201            ).value
202        except x509.ExtensionNotFound:
203            return x509.GeneralNames([])
204        else:
205            return x509.GeneralNames(sans)

Get all SubjectAlternativeName DNS altnames.

crl_distribution_points: list[str]
207    @property
208    def crl_distribution_points(self) -> list[str]:
209        try:
210            ext = self._cert.extensions.get_extension_for_class(
211                x509.CRLDistributionPoints
212            ).value
213        except x509.ExtensionNotFound:
214            return []
215        else:
216            return [
217                dist_point.full_name[0].value
218                for dist_point in ext
219                if dist_point.full_name
220                and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier)
221            ]
built with pdocpdoc logo