Edit on GitHub

mitmproxy.certs

  1import contextlib
  2import datetime
  3import ipaddress
  4import os
  5import sys
  6import warnings
  7from collections.abc import Iterable
  8from dataclasses import dataclass
  9from pathlib import Path
 10from typing import cast
 11from typing import NewType
 12from typing import Optional
 13from typing import Union
 14
 15import OpenSSL
 16from cryptography import x509
 17from cryptography.hazmat.primitives import hashes
 18from cryptography.hazmat.primitives import serialization
 19from cryptography.hazmat.primitives.asymmetric import dsa
 20from cryptography.hazmat.primitives.asymmetric import ec
 21from cryptography.hazmat.primitives.asymmetric import rsa
 22from cryptography.hazmat.primitives.serialization import pkcs12
 23from cryptography.x509 import ExtendedKeyUsageOID
 24from cryptography.x509 import NameOID
 25
 26from mitmproxy.coretypes import serializable
 27
 28# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815
 29CA_EXPIRY = datetime.timedelta(days=10 * 365)
 30CERT_EXPIRY = datetime.timedelta(days=365)
 31
 32# Generated with "openssl dhparam". It's too slow to generate this on startup.
 33DEFAULT_DHPARAM = b"""
 34-----BEGIN DH PARAMETERS-----
 35MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3
 36O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv
 37j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ
 38Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB
 39chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC
 40ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq
 41o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX
 42IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv
 43A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8
 446mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I
 45rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
 46-----END DH PARAMETERS-----
 47"""
 48
 49
 50class Cert(serializable.Serializable):
 51    """Representation of a (TLS) certificate."""
 52
 53    _cert: x509.Certificate
 54
 55    def __init__(self, cert: x509.Certificate):
 56        assert isinstance(cert, x509.Certificate)
 57        self._cert = cert
 58
 59    def __eq__(self, other):
 60        return self.fingerprint() == other.fingerprint()
 61
 62    def __repr__(self):
 63        altnames = [str(x.value) for x in self.altnames]
 64        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 65
 66    def __hash__(self):
 67        return self._cert.__hash__()
 68
 69    @classmethod
 70    def from_state(cls, state):
 71        return cls.from_pem(state)
 72
 73    def get_state(self):
 74        return self.to_pem()
 75
 76    def set_state(self, state):
 77        self._cert = x509.load_pem_x509_certificate(state)
 78
 79    @classmethod
 80    def from_pem(cls, data: bytes) -> "Cert":
 81        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 82        return cls(cert)
 83
 84    def to_pem(self) -> bytes:
 85        return self._cert.public_bytes(serialization.Encoding.PEM)
 86
 87    @classmethod
 88    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 89        return Cert(x509.to_cryptography())
 90
 91    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 92        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 93
 94    def fingerprint(self) -> bytes:
 95        return self._cert.fingerprint(hashes.SHA256())
 96
 97    @property
 98    def issuer(self) -> list[tuple[str, str]]:
 99        return _name_to_keyval(self._cert.issuer)
100
101    @property
102    def notbefore(self) -> datetime.datetime:
103        try:
104            # type definitions haven't caught up with new API yet.
105            return self._cert.not_valid_before_utc  # type: ignore
106        except AttributeError:  # pragma: no cover
107            # cryptography < 42.0
108            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
109
110    @property
111    def notafter(self) -> datetime.datetime:
112        try:
113            return self._cert.not_valid_after_utc  # type: ignore
114        except AttributeError:  # pragma: no cover
115            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
116
117    def has_expired(self) -> bool:
118        if sys.version_info < (3, 11):  # pragma: no cover
119            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
120        return datetime.datetime.now(datetime.UTC) > self.notafter
121
122    @property
123    def subject(self) -> list[tuple[str, str]]:
124        return _name_to_keyval(self._cert.subject)
125
126    @property
127    def serial(self) -> int:
128        return self._cert.serial_number
129
130    @property
131    def keyinfo(self) -> tuple[str, int]:
132        public_key = self._cert.public_key()
133        if isinstance(public_key, rsa.RSAPublicKey):
134            return "RSA", public_key.key_size
135        if isinstance(public_key, dsa.DSAPublicKey):
136            return "DSA", public_key.key_size
137        if isinstance(public_key, ec.EllipticCurvePublicKey):
138            return f"EC ({public_key.curve.name})", public_key.key_size
139        return (
140            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
141            getattr(public_key, "key_size", -1),
142        )  # pragma: no cover
143
144    @property
145    def cn(self) -> str | None:
146        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
147        if attrs:
148            return cast(str, attrs[0].value)
149        return None
150
151    @property
152    def organization(self) -> str | None:
153        attrs = self._cert.subject.get_attributes_for_oid(
154            x509.NameOID.ORGANIZATION_NAME
155        )
156        if attrs:
157            return cast(str, attrs[0].value)
158        return None
159
160    @property
161    def altnames(self) -> x509.GeneralNames:
162        """
163        Get all SubjectAlternativeName DNS altnames.
164        """
165        try:
166            sans = self._cert.extensions.get_extension_for_class(
167                x509.SubjectAlternativeName
168            ).value
169        except x509.ExtensionNotFound:
170            return x509.GeneralNames([])
171        else:
172            return x509.GeneralNames(sans)
173
174
175def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
176    parts = []
177    for attr in name:
178        k = attr.rfc4514_string().partition("=")[0]
179        v = cast(str, attr.value)
180        parts.append((k, v))
181    return parts
182
183
184def create_ca(
185    organization: str,
186    cn: str,
187    key_size: int,
188) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
189    now = datetime.datetime.now()
190
191    private_key = rsa.generate_private_key(
192        public_exponent=65537,
193        key_size=key_size,
194    )  # type: ignore
195    name = x509.Name(
196        [
197            x509.NameAttribute(NameOID.COMMON_NAME, cn),
198            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
199        ]
200    )
201    builder = x509.CertificateBuilder()
202    builder = builder.serial_number(x509.random_serial_number())
203    builder = builder.subject_name(name)
204    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
205    builder = builder.not_valid_after(now + CA_EXPIRY)
206    builder = builder.issuer_name(name)
207    builder = builder.public_key(private_key.public_key())
208    builder = builder.add_extension(
209        x509.BasicConstraints(ca=True, path_length=None), critical=True
210    )
211    builder = builder.add_extension(
212        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
213    )
214    builder = builder.add_extension(
215        x509.KeyUsage(
216            digital_signature=False,
217            content_commitment=False,
218            key_encipherment=False,
219            data_encipherment=False,
220            key_agreement=False,
221            key_cert_sign=True,
222            crl_sign=True,
223            encipher_only=False,
224            decipher_only=False,
225        ),
226        critical=True,
227    )
228    builder = builder.add_extension(
229        x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
230        critical=False,
231    )
232    cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256())  # type: ignore
233    return private_key, cert
234
235
236def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
237    """
238    SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
239    This function converts the old format to the new one.
240    """
241    if isinstance(sans, x509.GeneralNames):
242        return sans
243    elif (
244        isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
245    ):  # pragma: no cover
246        warnings.warn(
247            "Passing SANs as a list of strings is deprecated.",
248            DeprecationWarning,
249            stacklevel=2,
250        )
251
252        ss: list[x509.GeneralName] = []
253        for x in cast(list[str], sans):
254            try:
255                ip = ipaddress.ip_address(x)
256            except ValueError:
257                x = x.encode("idna").decode()
258                ss.append(x509.DNSName(x))
259            else:
260                ss.append(x509.IPAddress(ip))
261        return x509.GeneralNames(ss)
262    else:
263        return x509.GeneralNames(sans)
264
265
266def dummy_cert(
267    privkey: rsa.RSAPrivateKey,
268    cacert: x509.Certificate,
269    commonname: str | None,
270    sans: Iterable[x509.GeneralName],
271    organization: str | None = None,
272) -> Cert:
273    """
274    Generates a dummy certificate.
275
276    privkey: CA private key
277    cacert: CA certificate
278    commonname: Common name for the generated certificate.
279    sans: A list of Subject Alternate Names.
280    organization: Organization name for the generated certificate.
281
282    Returns cert if operation succeeded, None if not.
283    """
284    builder = x509.CertificateBuilder()
285    builder = builder.issuer_name(cacert.subject)
286    builder = builder.add_extension(
287        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
288    )
289    builder = builder.public_key(cacert.public_key())
290
291    now = datetime.datetime.now()
292    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
293    builder = builder.not_valid_after(now + CERT_EXPIRY)
294
295    subject = []
296    is_valid_commonname = commonname is not None and len(commonname) < 64
297    if is_valid_commonname:
298        assert commonname is not None
299        subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname))
300    if organization is not None:
301        assert organization is not None
302        subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))
303    builder = builder.subject_name(x509.Name(subject))
304    builder = builder.serial_number(x509.random_serial_number())
305
306    # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
307    builder = builder.add_extension(
308        x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
309        critical=not is_valid_commonname,
310    )
311
312    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
313    builder = builder.add_extension(
314        x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()),
315        critical=False,
316    )
317    # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways,
318    # see https://github.com/mitmproxy/mitmproxy/issues/6494.
319    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states
320    # that SKI is optional for the leaf cert, so we skip that.
321
322    cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256())  # type: ignore
323    return Cert(cert)
324
325
326@dataclass(frozen=True)
327class CertStoreEntry:
328    cert: Cert
329    privatekey: rsa.RSAPrivateKey
330    chain_file: Path | None
331    chain_certs: list[Cert]
332
333
334TCustomCertId = str  # manually provided certs (e.g. mitmproxy's --certs)
335TGeneratedCertId = tuple[Optional[str], x509.GeneralNames]  # (common_name, sans)
336TCertId = Union[TCustomCertId, TGeneratedCertId]
337
338DHParams = NewType("DHParams", bytes)
339
340
341class CertStore:
342    """
343    Implements an in-memory certificate store.
344    """
345
346    STORE_CAP = 100
347    certs: dict[TCertId, CertStoreEntry]
348    expire_queue: list[CertStoreEntry]
349
350    def __init__(
351        self,
352        default_privatekey: rsa.RSAPrivateKey,
353        default_ca: Cert,
354        default_chain_file: Path | None,
355        dhparams: DHParams,
356    ):
357        self.default_privatekey = default_privatekey
358        self.default_ca = default_ca
359        self.default_chain_file = default_chain_file
360        self.default_chain_certs = (
361            x509.load_pem_x509_certificates(self.default_chain_file.read_bytes())
362            if self.default_chain_file
363            else [default_ca]
364        )
365        self.dhparams = dhparams
366        self.certs = {}
367        self.expire_queue = []
368
369    def expire(self, entry: CertStoreEntry) -> None:
370        self.expire_queue.append(entry)
371        if len(self.expire_queue) > self.STORE_CAP:
372            d = self.expire_queue.pop(0)
373            self.certs = {k: v for k, v in self.certs.items() if v != d}
374
375    @staticmethod
376    def load_dhparam(path: Path) -> DHParams:
377        # mitmproxy<=0.10 doesn't generate a dhparam file.
378        # Create it now if necessary.
379        if not path.exists():
380            path.write_bytes(DEFAULT_DHPARAM)
381
382        # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's
383        # expected format.
384        bio = OpenSSL.SSL._lib.BIO_new_file(  # type: ignore
385            str(path).encode(sys.getfilesystemencoding()), b"r"
386        )
387        if bio != OpenSSL.SSL._ffi.NULL:  # type: ignore
388            bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free)  # type: ignore
389            dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams(  # type: ignore
390                bio,
391                OpenSSL.SSL._ffi.NULL,  # type: ignore
392                OpenSSL.SSL._ffi.NULL,  # type: ignore
393                OpenSSL.SSL._ffi.NULL,  # type: ignore
394            )
395            dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free)  # type: ignore
396            return dh
397        raise RuntimeError("Error loading DH Params.")  # pragma: no cover
398
399    @classmethod
400    def from_store(
401        cls,
402        path: Path | str,
403        basename: str,
404        key_size: int,
405        passphrase: bytes | None = None,
406    ) -> "CertStore":
407        path = Path(path)
408        ca_file = path / f"{basename}-ca.pem"
409        dhparam_file = path / f"{basename}-dhparam.pem"
410        if not ca_file.exists():
411            cls.create_store(path, basename, key_size)
412        return cls.from_files(ca_file, dhparam_file, passphrase)
413
414    @classmethod
415    def from_files(
416        cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None
417    ) -> "CertStore":
418        raw = ca_file.read_bytes()
419        key = load_pem_private_key(raw, passphrase)
420        dh = cls.load_dhparam(dhparam_file)
421        certs = x509.load_pem_x509_certificates(raw)
422        ca = Cert(certs[0])
423        if len(certs) > 1:
424            chain_file: Path | None = ca_file
425        else:
426            chain_file = None
427        return cls(key, ca, chain_file, dh)
428
429    @staticmethod
430    @contextlib.contextmanager
431    def umask_secret():
432        """
433        Context to temporarily set umask to its original value bitor 0o77.
434        Useful when writing private keys to disk so that only the owner
435        will be able to read them.
436        """
437        original_umask = os.umask(0)
438        os.umask(original_umask | 0o77)
439        try:
440            yield
441        finally:
442            os.umask(original_umask)
443
444    @staticmethod
445    def create_store(
446        path: Path, basename: str, key_size: int, organization=None, cn=None
447    ) -> None:
448        path.mkdir(parents=True, exist_ok=True)
449
450        organization = organization or basename
451        cn = cn or basename
452
453        key: rsa.RSAPrivateKeyWithSerialization
454        ca: x509.Certificate
455        key, ca = create_ca(organization=organization, cn=cn, key_size=key_size)
456
457        # Dump the CA plus private key.
458        with CertStore.umask_secret():
459            # PEM format
460            (path / f"{basename}-ca.pem").write_bytes(
461                key.private_bytes(
462                    encoding=serialization.Encoding.PEM,
463                    format=serialization.PrivateFormat.TraditionalOpenSSL,
464                    encryption_algorithm=serialization.NoEncryption(),
465                )
466                + ca.public_bytes(serialization.Encoding.PEM)
467            )
468
469            # PKCS12 format for Windows devices
470            (path / f"{basename}-ca.p12").write_bytes(
471                pkcs12.serialize_key_and_certificates(  # type: ignore
472                    name=basename.encode(),
473                    key=key,
474                    cert=ca,
475                    cas=None,
476                    encryption_algorithm=serialization.NoEncryption(),
477                )
478            )
479
480        # Dump the certificate in PEM format
481        pem_cert = ca.public_bytes(serialization.Encoding.PEM)
482        (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert)
483        # Create a .cer file with the same contents for Android
484        (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert)
485
486        # Dump the certificate in PKCS12 format for Windows devices
487        (path / f"{basename}-ca-cert.p12").write_bytes(
488            pkcs12.serialize_key_and_certificates(
489                name=basename.encode(),
490                key=None,  # type: ignore
491                cert=ca,
492                cas=None,
493                encryption_algorithm=serialization.NoEncryption(),
494            )
495        )
496
497        (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM)
498
499    def add_cert_file(
500        self, spec: str, path: Path, passphrase: bytes | None = None
501    ) -> None:
502        raw = path.read_bytes()
503        cert = Cert.from_pem(raw)
504        try:
505            key = load_pem_private_key(raw, password=passphrase)
506        except ValueError:
507            key = self.default_privatekey
508
509        self.add_cert(CertStoreEntry(cert, key, path, [cert]), spec)
510
511    def add_cert(self, entry: CertStoreEntry, *names: str) -> None:
512        """
513        Adds a cert to the certstore. We register the CN in the cert plus
514        any SANs, and also the list of names provided as an argument.
515        """
516        if entry.cert.cn:
517            self.certs[entry.cert.cn] = entry
518        for i in entry.cert.altnames:
519            self.certs[str(i.value)] = entry
520        for i in names:
521            self.certs[i] = entry
522
523    @staticmethod
524    def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
525        """
526        Return all asterisk forms for a domain. For example, for www.example.com this will return
527        [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
528        """
529        if isinstance(dn, str):
530            parts = dn.split(".")
531            ret = [dn]
532            for i in range(1, len(parts)):
533                ret.append("*." + ".".join(parts[i:]))
534            return ret
535        elif isinstance(dn, x509.DNSName):
536            return CertStore.asterisk_forms(dn.value)
537        else:
538            return [str(dn.value)]
539
540    def get_cert(
541        self,
542        commonname: str | None,
543        sans: Iterable[x509.GeneralName],
544        organization: str | None = None,
545    ) -> CertStoreEntry:
546        """
547        commonname: Common name for the generated certificate. Must be a
548        valid, plain-ASCII, IDNA-encoded domain name.
549
550        sans: A list of Subject Alternate Names.
551
552        organization: Organization name for the generated certificate.
553        """
554        sans = _fix_legacy_sans(sans)
555
556        potential_keys: list[TCertId] = []
557        if commonname:
558            potential_keys.extend(self.asterisk_forms(commonname))
559        for s in sans:
560            potential_keys.extend(self.asterisk_forms(s))
561        potential_keys.append("*")
562        potential_keys.append((commonname, sans))
563
564        name = next(filter(lambda key: key in self.certs, potential_keys), None)
565        if name:
566            entry = self.certs[name]
567        else:
568            entry = CertStoreEntry(
569                cert=dummy_cert(
570                    self.default_privatekey,
571                    self.default_ca._cert,
572                    commonname,
573                    sans,
574                    organization,
575                ),
576                privatekey=self.default_privatekey,
577                chain_file=self.default_chain_file,
578                chain_certs=self.default_chain_certs,
579            )
580            self.certs[(commonname, sans)] = entry
581            self.expire(entry)
582
583        return entry
584
585
586def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey:
587    """
588    like cryptography's load_pem_private_key, but silently falls back to not using a password
589    if the private key is unencrypted.
590    """
591    try:
592        return serialization.load_pem_private_key(data, password)  # type: ignore
593    except TypeError:
594        if password is not None:
595            return load_pem_private_key(data, None)
596        raise
class Cert(mitmproxy.coretypes.serializable.Serializable):
 51class Cert(serializable.Serializable):
 52    """Representation of a (TLS) certificate."""
 53
 54    _cert: x509.Certificate
 55
 56    def __init__(self, cert: x509.Certificate):
 57        assert isinstance(cert, x509.Certificate)
 58        self._cert = cert
 59
 60    def __eq__(self, other):
 61        return self.fingerprint() == other.fingerprint()
 62
 63    def __repr__(self):
 64        altnames = [str(x.value) for x in self.altnames]
 65        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 66
 67    def __hash__(self):
 68        return self._cert.__hash__()
 69
 70    @classmethod
 71    def from_state(cls, state):
 72        return cls.from_pem(state)
 73
 74    def get_state(self):
 75        return self.to_pem()
 76
 77    def set_state(self, state):
 78        self._cert = x509.load_pem_x509_certificate(state)
 79
 80    @classmethod
 81    def from_pem(cls, data: bytes) -> "Cert":
 82        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 83        return cls(cert)
 84
 85    def to_pem(self) -> bytes:
 86        return self._cert.public_bytes(serialization.Encoding.PEM)
 87
 88    @classmethod
 89    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 90        return Cert(x509.to_cryptography())
 91
 92    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 93        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 94
 95    def fingerprint(self) -> bytes:
 96        return self._cert.fingerprint(hashes.SHA256())
 97
 98    @property
 99    def issuer(self) -> list[tuple[str, str]]:
100        return _name_to_keyval(self._cert.issuer)
101
102    @property
103    def notbefore(self) -> datetime.datetime:
104        try:
105            # type definitions haven't caught up with new API yet.
106            return self._cert.not_valid_before_utc  # type: ignore
107        except AttributeError:  # pragma: no cover
108            # cryptography < 42.0
109            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
110
111    @property
112    def notafter(self) -> datetime.datetime:
113        try:
114            return self._cert.not_valid_after_utc  # type: ignore
115        except AttributeError:  # pragma: no cover
116            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
117
118    def has_expired(self) -> bool:
119        if sys.version_info < (3, 11):  # pragma: no cover
120            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
121        return datetime.datetime.now(datetime.UTC) > self.notafter
122
123    @property
124    def subject(self) -> list[tuple[str, str]]:
125        return _name_to_keyval(self._cert.subject)
126
127    @property
128    def serial(self) -> int:
129        return self._cert.serial_number
130
131    @property
132    def keyinfo(self) -> tuple[str, int]:
133        public_key = self._cert.public_key()
134        if isinstance(public_key, rsa.RSAPublicKey):
135            return "RSA", public_key.key_size
136        if isinstance(public_key, dsa.DSAPublicKey):
137            return "DSA", public_key.key_size
138        if isinstance(public_key, ec.EllipticCurvePublicKey):
139            return f"EC ({public_key.curve.name})", public_key.key_size
140        return (
141            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
142            getattr(public_key, "key_size", -1),
143        )  # pragma: no cover
144
145    @property
146    def cn(self) -> str | None:
147        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
148        if attrs:
149            return cast(str, attrs[0].value)
150        return None
151
152    @property
153    def organization(self) -> str | None:
154        attrs = self._cert.subject.get_attributes_for_oid(
155            x509.NameOID.ORGANIZATION_NAME
156        )
157        if attrs:
158            return cast(str, attrs[0].value)
159        return None
160
161    @property
162    def altnames(self) -> x509.GeneralNames:
163        """
164        Get all SubjectAlternativeName DNS altnames.
165        """
166        try:
167            sans = self._cert.extensions.get_extension_for_class(
168                x509.SubjectAlternativeName
169            ).value
170        except x509.ExtensionNotFound:
171            return x509.GeneralNames([])
172        else:
173            return x509.GeneralNames(sans)

Representation of a (TLS) certificate.

Cert(cert: cryptography.x509.base.Certificate)
56    def __init__(self, cert: x509.Certificate):
57        assert isinstance(cert, x509.Certificate)
58        self._cert = cert
@classmethod
def from_pem(cls, data: bytes) -> Cert:
80    @classmethod
81    def from_pem(cls, data: bytes) -> "Cert":
82        cert = x509.load_pem_x509_certificate(data)  # type: ignore
83        return cls(cert)
def to_pem(self) -> bytes:
85    def to_pem(self) -> bytes:
86        return self._cert.public_bytes(serialization.Encoding.PEM)
@classmethod
def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> Cert:
88    @classmethod
89    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
90        return Cert(x509.to_cryptography())
def to_pyopenssl(self) -> OpenSSL.crypto.X509:
92    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
93        return OpenSSL.crypto.X509.from_cryptography(self._cert)
def fingerprint(self) -> bytes:
95    def fingerprint(self) -> bytes:
96        return self._cert.fingerprint(hashes.SHA256())
issuer: list[tuple[str, str]]
 98    @property
 99    def issuer(self) -> list[tuple[str, str]]:
100        return _name_to_keyval(self._cert.issuer)
notbefore: datetime.datetime
102    @property
103    def notbefore(self) -> datetime.datetime:
104        try:
105            # type definitions haven't caught up with new API yet.
106            return self._cert.not_valid_before_utc  # type: ignore
107        except AttributeError:  # pragma: no cover
108            # cryptography < 42.0
109            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
notafter: datetime.datetime
111    @property
112    def notafter(self) -> datetime.datetime:
113        try:
114            return self._cert.not_valid_after_utc  # type: ignore
115        except AttributeError:  # pragma: no cover
116            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
def has_expired(self) -> bool:
118    def has_expired(self) -> bool:
119        if sys.version_info < (3, 11):  # pragma: no cover
120            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
121        return datetime.datetime.now(datetime.UTC) > self.notafter
subject: list[tuple[str, str]]
123    @property
124    def subject(self) -> list[tuple[str, str]]:
125        return _name_to_keyval(self._cert.subject)
serial: int
127    @property
128    def serial(self) -> int:
129        return self._cert.serial_number
keyinfo: tuple[str, int]
131    @property
132    def keyinfo(self) -> tuple[str, int]:
133        public_key = self._cert.public_key()
134        if isinstance(public_key, rsa.RSAPublicKey):
135            return "RSA", public_key.key_size
136        if isinstance(public_key, dsa.DSAPublicKey):
137            return "DSA", public_key.key_size
138        if isinstance(public_key, ec.EllipticCurvePublicKey):
139            return f"EC ({public_key.curve.name})", public_key.key_size
140        return (
141            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
142            getattr(public_key, "key_size", -1),
143        )  # pragma: no cover
cn: str | None
145    @property
146    def cn(self) -> str | None:
147        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
148        if attrs:
149            return cast(str, attrs[0].value)
150        return None
organization: str | None
152    @property
153    def organization(self) -> str | None:
154        attrs = self._cert.subject.get_attributes_for_oid(
155            x509.NameOID.ORGANIZATION_NAME
156        )
157        if attrs:
158            return cast(str, attrs[0].value)
159        return None
altnames: cryptography.x509.extensions.GeneralNames
161    @property
162    def altnames(self) -> x509.GeneralNames:
163        """
164        Get all SubjectAlternativeName DNS altnames.
165        """
166        try:
167            sans = self._cert.extensions.get_extension_for_class(
168                x509.SubjectAlternativeName
169            ).value
170        except x509.ExtensionNotFound:
171            return x509.GeneralNames([])
172        else:
173            return x509.GeneralNames(sans)

Get all SubjectAlternativeName DNS altnames.

Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy