Edit on GitHub

mitmproxy.certs

  1import contextlib
  2import datetime
  3import ipaddress
  4import logging
  5import os
  6import sys
  7import warnings
  8from collections.abc import Iterable
  9from dataclasses import dataclass
 10from pathlib import Path
 11from typing import cast
 12from typing import NewType
 13from typing import Optional
 14from typing import Union
 15
 16import OpenSSL
 17from cryptography import x509
 18from cryptography.hazmat.primitives import hashes
 19from cryptography.hazmat.primitives import serialization
 20from cryptography.hazmat.primitives.asymmetric import dsa
 21from cryptography.hazmat.primitives.asymmetric import ec
 22from cryptography.hazmat.primitives.asymmetric import rsa
 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes
 24from cryptography.hazmat.primitives.serialization import pkcs12
 25from cryptography.x509 import ExtendedKeyUsageOID
 26from cryptography.x509 import NameOID
 27
 28from mitmproxy.coretypes import serializable
 29
 30logger = logging.getLogger(__name__)
 31
 32# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815
 33CA_EXPIRY = datetime.timedelta(days=10 * 365)
 34CERT_EXPIRY = datetime.timedelta(days=365)
 35
 36# Generated with "openssl dhparam". It's too slow to generate this on startup.
 37DEFAULT_DHPARAM = b"""
 38-----BEGIN DH PARAMETERS-----
 39MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3
 40O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv
 41j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ
 42Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB
 43chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC
 44ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq
 45o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX
 46IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv
 47A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8
 486mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I
 49rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
 50-----END DH PARAMETERS-----
 51"""
 52
 53
 54class Cert(serializable.Serializable):
 55    """Representation of a (TLS) certificate."""
 56
 57    _cert: x509.Certificate
 58
 59    def __init__(self, cert: x509.Certificate):
 60        assert isinstance(cert, x509.Certificate)
 61        self._cert = cert
 62
 63    def __eq__(self, other):
 64        return self.fingerprint() == other.fingerprint()
 65
 66    def __repr__(self):
 67        altnames = [str(x.value) for x in self.altnames]
 68        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 69
 70    def __hash__(self):
 71        return self._cert.__hash__()
 72
 73    @classmethod
 74    def from_state(cls, state):
 75        return cls.from_pem(state)
 76
 77    def get_state(self):
 78        return self.to_pem()
 79
 80    def set_state(self, state):
 81        self._cert = x509.load_pem_x509_certificate(state)
 82
 83    @classmethod
 84    def from_pem(cls, data: bytes) -> "Cert":
 85        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 86        return cls(cert)
 87
 88    def to_pem(self) -> bytes:
 89        return self._cert.public_bytes(serialization.Encoding.PEM)
 90
 91    @classmethod
 92    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 93        return Cert(x509.to_cryptography())
 94
 95    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 96        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 97
 98    def public_key(self) -> CertificatePublicKeyTypes:
 99        return self._cert.public_key()
100
101    def fingerprint(self) -> bytes:
102        return self._cert.fingerprint(hashes.SHA256())
103
104    @property
105    def issuer(self) -> list[tuple[str, str]]:
106        return _name_to_keyval(self._cert.issuer)
107
108    @property
109    def notbefore(self) -> datetime.datetime:
110        try:
111            # type definitions haven't caught up with new API yet.
112            return self._cert.not_valid_before_utc  # type: ignore
113        except AttributeError:  # pragma: no cover
114            # cryptography < 42.0
115            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
116
117    @property
118    def notafter(self) -> datetime.datetime:
119        try:
120            return self._cert.not_valid_after_utc  # type: ignore
121        except AttributeError:  # pragma: no cover
122            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
123
124    def has_expired(self) -> bool:
125        if sys.version_info < (3, 11):  # pragma: no cover
126            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
127        return datetime.datetime.now(datetime.UTC) > self.notafter
128
129    @property
130    def subject(self) -> list[tuple[str, str]]:
131        return _name_to_keyval(self._cert.subject)
132
133    @property
134    def serial(self) -> int:
135        return self._cert.serial_number
136
137    @property
138    def is_ca(self) -> bool:
139        constraints: x509.BasicConstraints
140        try:
141            constraints = self._cert.extensions.get_extension_for_class(
142                x509.BasicConstraints
143            ).value
144            return constraints.ca
145        except x509.ExtensionNotFound:
146            return False
147
148    @property
149    def keyinfo(self) -> tuple[str, int]:
150        public_key = self._cert.public_key()
151        if isinstance(public_key, rsa.RSAPublicKey):
152            return "RSA", public_key.key_size
153        if isinstance(public_key, dsa.DSAPublicKey):
154            return "DSA", public_key.key_size
155        if isinstance(public_key, ec.EllipticCurvePublicKey):
156            return f"EC ({public_key.curve.name})", public_key.key_size
157        return (
158            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
159            getattr(public_key, "key_size", -1),
160        )  # pragma: no cover
161
162    @property
163    def cn(self) -> str | None:
164        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
165        if attrs:
166            return cast(str, attrs[0].value)
167        return None
168
169    @property
170    def organization(self) -> str | None:
171        attrs = self._cert.subject.get_attributes_for_oid(
172            x509.NameOID.ORGANIZATION_NAME
173        )
174        if attrs:
175            return cast(str, attrs[0].value)
176        return None
177
178    @property
179    def altnames(self) -> x509.GeneralNames:
180        """
181        Get all SubjectAlternativeName DNS altnames.
182        """
183        try:
184            sans = self._cert.extensions.get_extension_for_class(
185                x509.SubjectAlternativeName
186            ).value
187        except x509.ExtensionNotFound:
188            return x509.GeneralNames([])
189        else:
190            return x509.GeneralNames(sans)
191
192
193def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
194    parts = []
195    for attr in name:
196        k = attr.rfc4514_string().partition("=")[0]
197        v = cast(str, attr.value)
198        parts.append((k, v))
199    return parts
200
201
202def create_ca(
203    organization: str,
204    cn: str,
205    key_size: int,
206) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
207    now = datetime.datetime.now()
208
209    private_key = rsa.generate_private_key(
210        public_exponent=65537,
211        key_size=key_size,
212    )  # type: ignore
213    name = x509.Name(
214        [
215            x509.NameAttribute(NameOID.COMMON_NAME, cn),
216            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
217        ]
218    )
219    builder = x509.CertificateBuilder()
220    builder = builder.serial_number(x509.random_serial_number())
221    builder = builder.subject_name(name)
222    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
223    builder = builder.not_valid_after(now + CA_EXPIRY)
224    builder = builder.issuer_name(name)
225    builder = builder.public_key(private_key.public_key())
226    builder = builder.add_extension(
227        x509.BasicConstraints(ca=True, path_length=None), critical=True
228    )
229    builder = builder.add_extension(
230        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
231    )
232    builder = builder.add_extension(
233        x509.KeyUsage(
234            digital_signature=False,
235            content_commitment=False,
236            key_encipherment=False,
237            data_encipherment=False,
238            key_agreement=False,
239            key_cert_sign=True,
240            crl_sign=True,
241            encipher_only=False,
242            decipher_only=False,
243        ),
244        critical=True,
245    )
246    builder = builder.add_extension(
247        x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
248        critical=False,
249    )
250    cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256())  # type: ignore
251    return private_key, cert
252
253
254def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
255    """
256    SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
257    This function converts the old format to the new one.
258    """
259    if isinstance(sans, x509.GeneralNames):
260        return sans
261    elif (
262        isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
263    ):  # pragma: no cover
264        warnings.warn(
265            "Passing SANs as a list of strings is deprecated.",
266            DeprecationWarning,
267            stacklevel=2,
268        )
269
270        ss: list[x509.GeneralName] = []
271        for x in cast(list[str], sans):
272            try:
273                ip = ipaddress.ip_address(x)
274            except ValueError:
275                x = x.encode("idna").decode()
276                ss.append(x509.DNSName(x))
277            else:
278                ss.append(x509.IPAddress(ip))
279        return x509.GeneralNames(ss)
280    else:
281        return x509.GeneralNames(sans)
282
283
284def dummy_cert(
285    privkey: rsa.RSAPrivateKey,
286    cacert: x509.Certificate,
287    commonname: str | None,
288    sans: Iterable[x509.GeneralName],
289    organization: str | None = None,
290) -> Cert:
291    """
292    Generates a dummy certificate.
293
294    privkey: CA private key
295    cacert: CA certificate
296    commonname: Common name for the generated certificate.
297    sans: A list of Subject Alternate Names.
298    organization: Organization name for the generated certificate.
299
300    Returns cert if operation succeeded, None if not.
301    """
302    builder = x509.CertificateBuilder()
303    builder = builder.issuer_name(cacert.subject)
304    builder = builder.add_extension(
305        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
306    )
307    builder = builder.public_key(cacert.public_key())
308
309    now = datetime.datetime.now()
310    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
311    builder = builder.not_valid_after(now + CERT_EXPIRY)
312
313    subject = []
314    is_valid_commonname = commonname is not None and len(commonname) < 64
315    if is_valid_commonname:
316        assert commonname is not None
317        subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname))
318    if organization is not None:
319        assert organization is not None
320        subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))
321    builder = builder.subject_name(x509.Name(subject))
322    builder = builder.serial_number(x509.random_serial_number())
323
324    # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
325    builder = builder.add_extension(
326        x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
327        critical=not is_valid_commonname,
328    )
329
330    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
331    builder = builder.add_extension(
332        x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()),
333        critical=False,
334    )
335    # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways,
336    # see https://github.com/mitmproxy/mitmproxy/issues/6494.
337    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states
338    # that SKI is optional for the leaf cert, so we skip that.
339
340    cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256())  # type: ignore
341    return Cert(cert)
342
343
344@dataclass(frozen=True)
345class CertStoreEntry:
346    cert: Cert
347    privatekey: rsa.RSAPrivateKey
348    chain_file: Path | None
349    chain_certs: list[Cert]
350
351
352TCustomCertId = str  # manually provided certs (e.g. mitmproxy's --certs)
353TGeneratedCertId = tuple[Optional[str], x509.GeneralNames]  # (common_name, sans)
354TCertId = Union[TCustomCertId, TGeneratedCertId]
355
356DHParams = NewType("DHParams", bytes)
357
358
359class CertStore:
360    """
361    Implements an in-memory certificate store.
362    """
363
364    STORE_CAP = 100
365    default_privatekey: rsa.RSAPrivateKey
366    default_ca: Cert
367    default_chain_file: Path | None
368    default_chain_certs: list[Cert]
369    dhparams: DHParams
370    certs: dict[TCertId, CertStoreEntry]
371    expire_queue: list[CertStoreEntry]
372
373    def __init__(
374        self,
375        default_privatekey: rsa.RSAPrivateKey,
376        default_ca: Cert,
377        default_chain_file: Path | None,
378        dhparams: DHParams,
379    ):
380        self.default_privatekey = default_privatekey
381        self.default_ca = default_ca
382        self.default_chain_file = default_chain_file
383        self.default_chain_certs = (
384            [
385                Cert(c)
386                for c in x509.load_pem_x509_certificates(
387                    self.default_chain_file.read_bytes()
388                )
389            ]
390            if self.default_chain_file
391            else [default_ca]
392        )
393        self.dhparams = dhparams
394        self.certs = {}
395        self.expire_queue = []
396
397    def expire(self, entry: CertStoreEntry) -> None:
398        self.expire_queue.append(entry)
399        if len(self.expire_queue) > self.STORE_CAP:
400            d = self.expire_queue.pop(0)
401            self.certs = {k: v for k, v in self.certs.items() if v != d}
402
403    @staticmethod
404    def load_dhparam(path: Path) -> DHParams:
405        # mitmproxy<=0.10 doesn't generate a dhparam file.
406        # Create it now if necessary.
407        if not path.exists():
408            path.write_bytes(DEFAULT_DHPARAM)
409
410        # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's
411        # expected format.
412        bio = OpenSSL.SSL._lib.BIO_new_file(  # type: ignore
413            str(path).encode(sys.getfilesystemencoding()), b"r"
414        )
415        if bio != OpenSSL.SSL._ffi.NULL:  # type: ignore
416            bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free)  # type: ignore
417            dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams(  # type: ignore
418                bio,
419                OpenSSL.SSL._ffi.NULL,  # type: ignore
420                OpenSSL.SSL._ffi.NULL,  # type: ignore
421                OpenSSL.SSL._ffi.NULL,  # type: ignore
422            )
423            dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free)  # type: ignore
424            return dh
425        raise RuntimeError("Error loading DH Params.")  # pragma: no cover
426
427    @classmethod
428    def from_store(
429        cls,
430        path: Path | str,
431        basename: str,
432        key_size: int,
433        passphrase: bytes | None = None,
434    ) -> "CertStore":
435        path = Path(path)
436        ca_file = path / f"{basename}-ca.pem"
437        dhparam_file = path / f"{basename}-dhparam.pem"
438        if not ca_file.exists():
439            cls.create_store(path, basename, key_size)
440        return cls.from_files(ca_file, dhparam_file, passphrase)
441
442    @classmethod
443    def from_files(
444        cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None
445    ) -> "CertStore":
446        raw = ca_file.read_bytes()
447        key = load_pem_private_key(raw, passphrase)
448        dh = cls.load_dhparam(dhparam_file)
449        certs = x509.load_pem_x509_certificates(raw)
450        ca = Cert(certs[0])
451        if len(certs) > 1:
452            chain_file: Path | None = ca_file
453        else:
454            chain_file = None
455        return cls(key, ca, chain_file, dh)
456
457    @staticmethod
458    @contextlib.contextmanager
459    def umask_secret():
460        """
461        Context to temporarily set umask to its original value bitor 0o77.
462        Useful when writing private keys to disk so that only the owner
463        will be able to read them.
464        """
465        original_umask = os.umask(0)
466        os.umask(original_umask | 0o77)
467        try:
468            yield
469        finally:
470            os.umask(original_umask)
471
472    @staticmethod
473    def create_store(
474        path: Path, basename: str, key_size: int, organization=None, cn=None
475    ) -> None:
476        path.mkdir(parents=True, exist_ok=True)
477
478        organization = organization or basename
479        cn = cn or basename
480
481        key: rsa.RSAPrivateKeyWithSerialization
482        ca: x509.Certificate
483        key, ca = create_ca(organization=organization, cn=cn, key_size=key_size)
484
485        # Dump the CA plus private key.
486        with CertStore.umask_secret():
487            # PEM format
488            (path / f"{basename}-ca.pem").write_bytes(
489                key.private_bytes(
490                    encoding=serialization.Encoding.PEM,
491                    format=serialization.PrivateFormat.TraditionalOpenSSL,
492                    encryption_algorithm=serialization.NoEncryption(),
493                )
494                + ca.public_bytes(serialization.Encoding.PEM)
495            )
496
497            # PKCS12 format for Windows devices
498            (path / f"{basename}-ca.p12").write_bytes(
499                pkcs12.serialize_key_and_certificates(  # type: ignore
500                    name=basename.encode(),
501                    key=key,
502                    cert=ca,
503                    cas=None,
504                    encryption_algorithm=serialization.NoEncryption(),
505                )
506            )
507
508        # Dump the certificate in PEM format
509        pem_cert = ca.public_bytes(serialization.Encoding.PEM)
510        (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert)
511        # Create a .cer file with the same contents for Android
512        (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert)
513
514        # Dump the certificate in PKCS12 format for Windows devices
515        (path / f"{basename}-ca-cert.p12").write_bytes(
516            pkcs12.serialize_key_and_certificates(
517                name=basename.encode(),
518                key=None,  # type: ignore
519                cert=ca,
520                cas=None,
521                encryption_algorithm=serialization.NoEncryption(),
522            )
523        )
524
525        (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM)
526
527    def add_cert_file(
528        self, spec: str, path: Path, passphrase: bytes | None = None
529    ) -> None:
530        raw = path.read_bytes()
531        cert = Cert.from_pem(raw)
532        try:
533            private_key = load_pem_private_key(raw, password=passphrase)
534        except ValueError as e:
535            private_key = self.default_privatekey
536            if cert.public_key() != private_key.public_key():
537                raise ValueError(
538                    f'Unable to find private key in "{path.absolute()}": {e}'
539                ) from e
540        else:
541            if cert.public_key() != private_key.public_key():
542                raise ValueError(
543                    f'Private and public keys in "{path.absolute()}" do not match:\n'
544                    f"{cert.public_key()=}\n"
545                    f"{private_key.public_key()=}"
546                )
547
548        try:
549            chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)]
550        except ValueError as e:
551            logger.warning(f"Failed to read certificate chain: {e}")
552            chain = [cert]
553
554        if cert.is_ca:
555            logger.warning(
556                f'"{path.absolute()}" is a certificate authority and not a leaf certificate. '
557                f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/."
558            )
559
560        self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec)
561
562    def add_cert(self, entry: CertStoreEntry, *names: str) -> None:
563        """
564        Adds a cert to the certstore. We register the CN in the cert plus
565        any SANs, and also the list of names provided as an argument.
566        """
567        if entry.cert.cn:
568            self.certs[entry.cert.cn] = entry
569        for i in entry.cert.altnames:
570            self.certs[str(i.value)] = entry
571        for i in names:
572            self.certs[i] = entry
573
574    @staticmethod
575    def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
576        """
577        Return all asterisk forms for a domain. For example, for www.example.com this will return
578        [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
579        """
580        if isinstance(dn, str):
581            parts = dn.split(".")
582            ret = [dn]
583            for i in range(1, len(parts)):
584                ret.append("*." + ".".join(parts[i:]))
585            return ret
586        elif isinstance(dn, x509.DNSName):
587            return CertStore.asterisk_forms(dn.value)
588        else:
589            return [str(dn.value)]
590
591    def get_cert(
592        self,
593        commonname: str | None,
594        sans: Iterable[x509.GeneralName],
595        organization: str | None = None,
596    ) -> CertStoreEntry:
597        """
598        commonname: Common name for the generated certificate. Must be a
599        valid, plain-ASCII, IDNA-encoded domain name.
600
601        sans: A list of Subject Alternate Names.
602
603        organization: Organization name for the generated certificate.
604        """
605        sans = _fix_legacy_sans(sans)
606
607        potential_keys: list[TCertId] = []
608        if commonname:
609            potential_keys.extend(self.asterisk_forms(commonname))
610        for s in sans:
611            potential_keys.extend(self.asterisk_forms(s))
612        potential_keys.append("*")
613        potential_keys.append((commonname, sans))
614
615        name = next(filter(lambda key: key in self.certs, potential_keys), None)
616        if name:
617            entry = self.certs[name]
618        else:
619            entry = CertStoreEntry(
620                cert=dummy_cert(
621                    self.default_privatekey,
622                    self.default_ca._cert,
623                    commonname,
624                    sans,
625                    organization,
626                ),
627                privatekey=self.default_privatekey,
628                chain_file=self.default_chain_file,
629                chain_certs=self.default_chain_certs,
630            )
631            self.certs[(commonname, sans)] = entry
632            self.expire(entry)
633
634        return entry
635
636
637def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey:
638    """
639    like cryptography's load_pem_private_key, but silently falls back to not using a password
640    if the private key is unencrypted.
641    """
642    try:
643        return serialization.load_pem_private_key(data, password)  # type: ignore
644    except TypeError:
645        if password is not None:
646            return load_pem_private_key(data, None)
647        raise
class Cert(mitmproxy.coretypes.serializable.Serializable):
 55class Cert(serializable.Serializable):
 56    """Representation of a (TLS) certificate."""
 57
 58    _cert: x509.Certificate
 59
 60    def __init__(self, cert: x509.Certificate):
 61        assert isinstance(cert, x509.Certificate)
 62        self._cert = cert
 63
 64    def __eq__(self, other):
 65        return self.fingerprint() == other.fingerprint()
 66
 67    def __repr__(self):
 68        altnames = [str(x.value) for x in self.altnames]
 69        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 70
 71    def __hash__(self):
 72        return self._cert.__hash__()
 73
 74    @classmethod
 75    def from_state(cls, state):
 76        return cls.from_pem(state)
 77
 78    def get_state(self):
 79        return self.to_pem()
 80
 81    def set_state(self, state):
 82        self._cert = x509.load_pem_x509_certificate(state)
 83
 84    @classmethod
 85    def from_pem(cls, data: bytes) -> "Cert":
 86        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 87        return cls(cert)
 88
 89    def to_pem(self) -> bytes:
 90        return self._cert.public_bytes(serialization.Encoding.PEM)
 91
 92    @classmethod
 93    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 94        return Cert(x509.to_cryptography())
 95
 96    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 97        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 98
 99    def public_key(self) -> CertificatePublicKeyTypes:
100        return self._cert.public_key()
101
102    def fingerprint(self) -> bytes:
103        return self._cert.fingerprint(hashes.SHA256())
104
105    @property
106    def issuer(self) -> list[tuple[str, str]]:
107        return _name_to_keyval(self._cert.issuer)
108
109    @property
110    def notbefore(self) -> datetime.datetime:
111        try:
112            # type definitions haven't caught up with new API yet.
113            return self._cert.not_valid_before_utc  # type: ignore
114        except AttributeError:  # pragma: no cover
115            # cryptography < 42.0
116            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
117
118    @property
119    def notafter(self) -> datetime.datetime:
120        try:
121            return self._cert.not_valid_after_utc  # type: ignore
122        except AttributeError:  # pragma: no cover
123            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
124
125    def has_expired(self) -> bool:
126        if sys.version_info < (3, 11):  # pragma: no cover
127            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
128        return datetime.datetime.now(datetime.UTC) > self.notafter
129
130    @property
131    def subject(self) -> list[tuple[str, str]]:
132        return _name_to_keyval(self._cert.subject)
133
134    @property
135    def serial(self) -> int:
136        return self._cert.serial_number
137
138    @property
139    def is_ca(self) -> bool:
140        constraints: x509.BasicConstraints
141        try:
142            constraints = self._cert.extensions.get_extension_for_class(
143                x509.BasicConstraints
144            ).value
145            return constraints.ca
146        except x509.ExtensionNotFound:
147            return False
148
149    @property
150    def keyinfo(self) -> tuple[str, int]:
151        public_key = self._cert.public_key()
152        if isinstance(public_key, rsa.RSAPublicKey):
153            return "RSA", public_key.key_size
154        if isinstance(public_key, dsa.DSAPublicKey):
155            return "DSA", public_key.key_size
156        if isinstance(public_key, ec.EllipticCurvePublicKey):
157            return f"EC ({public_key.curve.name})", public_key.key_size
158        return (
159            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
160            getattr(public_key, "key_size", -1),
161        )  # pragma: no cover
162
163    @property
164    def cn(self) -> str | None:
165        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
166        if attrs:
167            return cast(str, attrs[0].value)
168        return None
169
170    @property
171    def organization(self) -> str | None:
172        attrs = self._cert.subject.get_attributes_for_oid(
173            x509.NameOID.ORGANIZATION_NAME
174        )
175        if attrs:
176            return cast(str, attrs[0].value)
177        return None
178
179    @property
180    def altnames(self) -> x509.GeneralNames:
181        """
182        Get all SubjectAlternativeName DNS altnames.
183        """
184        try:
185            sans = self._cert.extensions.get_extension_for_class(
186                x509.SubjectAlternativeName
187            ).value
188        except x509.ExtensionNotFound:
189            return x509.GeneralNames([])
190        else:
191            return x509.GeneralNames(sans)

Representation of a (TLS) certificate.

Cert(cert: cryptography.hazmat.bindings._rust.x509.Certificate)
60    def __init__(self, cert: x509.Certificate):
61        assert isinstance(cert, x509.Certificate)
62        self._cert = cert
@classmethod
def from_pem(cls, data: bytes) -> Cert:
84    @classmethod
85    def from_pem(cls, data: bytes) -> "Cert":
86        cert = x509.load_pem_x509_certificate(data)  # type: ignore
87        return cls(cert)
def to_pem(self) -> bytes:
89    def to_pem(self) -> bytes:
90        return self._cert.public_bytes(serialization.Encoding.PEM)
@classmethod
def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> Cert:
92    @classmethod
93    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
94        return Cert(x509.to_cryptography())
def to_pyopenssl(self) -> OpenSSL.crypto.X509:
96    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
97        return OpenSSL.crypto.X509.from_cryptography(self._cert)
def public_key( self) -> Union[cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey]:
 99    def public_key(self) -> CertificatePublicKeyTypes:
100        return self._cert.public_key()
def fingerprint(self) -> bytes:
102    def fingerprint(self) -> bytes:
103        return self._cert.fingerprint(hashes.SHA256())
issuer: list[tuple[str, str]]
105    @property
106    def issuer(self) -> list[tuple[str, str]]:
107        return _name_to_keyval(self._cert.issuer)
notbefore: datetime.datetime
109    @property
110    def notbefore(self) -> datetime.datetime:
111        try:
112            # type definitions haven't caught up with new API yet.
113            return self._cert.not_valid_before_utc  # type: ignore
114        except AttributeError:  # pragma: no cover
115            # cryptography < 42.0
116            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
notafter: datetime.datetime
118    @property
119    def notafter(self) -> datetime.datetime:
120        try:
121            return self._cert.not_valid_after_utc  # type: ignore
122        except AttributeError:  # pragma: no cover
123            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
def has_expired(self) -> bool:
125    def has_expired(self) -> bool:
126        if sys.version_info < (3, 11):  # pragma: no cover
127            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
128        return datetime.datetime.now(datetime.UTC) > self.notafter
subject: list[tuple[str, str]]
130    @property
131    def subject(self) -> list[tuple[str, str]]:
132        return _name_to_keyval(self._cert.subject)
serial: int
134    @property
135    def serial(self) -> int:
136        return self._cert.serial_number
is_ca: bool
138    @property
139    def is_ca(self) -> bool:
140        constraints: x509.BasicConstraints
141        try:
142            constraints = self._cert.extensions.get_extension_for_class(
143                x509.BasicConstraints
144            ).value
145            return constraints.ca
146        except x509.ExtensionNotFound:
147            return False
keyinfo: tuple[str, int]
149    @property
150    def keyinfo(self) -> tuple[str, int]:
151        public_key = self._cert.public_key()
152        if isinstance(public_key, rsa.RSAPublicKey):
153            return "RSA", public_key.key_size
154        if isinstance(public_key, dsa.DSAPublicKey):
155            return "DSA", public_key.key_size
156        if isinstance(public_key, ec.EllipticCurvePublicKey):
157            return f"EC ({public_key.curve.name})", public_key.key_size
158        return (
159            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
160            getattr(public_key, "key_size", -1),
161        )  # pragma: no cover
cn: str | None
163    @property
164    def cn(self) -> str | None:
165        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
166        if attrs:
167            return cast(str, attrs[0].value)
168        return None
organization: str | None
170    @property
171    def organization(self) -> str | None:
172        attrs = self._cert.subject.get_attributes_for_oid(
173            x509.NameOID.ORGANIZATION_NAME
174        )
175        if attrs:
176            return cast(str, attrs[0].value)
177        return None
altnames: cryptography.x509.extensions.GeneralNames
179    @property
180    def altnames(self) -> x509.GeneralNames:
181        """
182        Get all SubjectAlternativeName DNS altnames.
183        """
184        try:
185            sans = self._cert.extensions.get_extension_for_class(
186                x509.SubjectAlternativeName
187            ).value
188        except x509.ExtensionNotFound:
189            return x509.GeneralNames([])
190        else:
191            return x509.GeneralNames(sans)

Get all SubjectAlternativeName DNS altnames.