Edit on GitHub

mitmproxy.certs

  1import contextlib
  2import datetime
  3import ipaddress
  4import logging
  5import os
  6import sys
  7import warnings
  8from collections.abc import Iterable
  9from dataclasses import dataclass
 10from pathlib import Path
 11from typing import cast
 12from typing import NewType
 13from typing import Optional
 14from typing import Union
 15
 16import OpenSSL
 17from cryptography import x509
 18from cryptography.hazmat.primitives import hashes
 19from cryptography.hazmat.primitives import serialization
 20from cryptography.hazmat.primitives.asymmetric import dsa
 21from cryptography.hazmat.primitives.asymmetric import ec
 22from cryptography.hazmat.primitives.asymmetric import rsa
 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes
 24from cryptography.hazmat.primitives.serialization import pkcs12
 25from cryptography.x509 import ExtendedKeyUsageOID
 26from cryptography.x509 import NameOID
 27
 28from mitmproxy.coretypes import serializable
 29
 30logger = logging.getLogger(__name__)
 31
 32# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815
 33CA_EXPIRY = datetime.timedelta(days=10 * 365)
 34CERT_EXPIRY = datetime.timedelta(days=365)
 35
 36# Generated with "openssl dhparam". It's too slow to generate this on startup.
 37DEFAULT_DHPARAM = b"""
 38-----BEGIN DH PARAMETERS-----
 39MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3
 40O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv
 41j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ
 42Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB
 43chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC
 44ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq
 45o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX
 46IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv
 47A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8
 486mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I
 49rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI=
 50-----END DH PARAMETERS-----
 51"""
 52
 53
 54class Cert(serializable.Serializable):
 55    """Representation of a (TLS) certificate."""
 56
 57    _cert: x509.Certificate
 58
 59    def __init__(self, cert: x509.Certificate):
 60        assert isinstance(cert, x509.Certificate)
 61        self._cert = cert
 62
 63    def __eq__(self, other):
 64        return self.fingerprint() == other.fingerprint()
 65
 66    def __repr__(self):
 67        altnames = [str(x.value) for x in self.altnames]
 68        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 69
 70    def __hash__(self):
 71        return self._cert.__hash__()
 72
 73    @classmethod
 74    def from_state(cls, state):
 75        return cls.from_pem(state)
 76
 77    def get_state(self):
 78        return self.to_pem()
 79
 80    def set_state(self, state):
 81        self._cert = x509.load_pem_x509_certificate(state)
 82
 83    @classmethod
 84    def from_pem(cls, data: bytes) -> "Cert":
 85        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 86        return cls(cert)
 87
 88    def to_pem(self) -> bytes:
 89        return self._cert.public_bytes(serialization.Encoding.PEM)
 90
 91    @classmethod
 92    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 93        return Cert(x509.to_cryptography())
 94
 95    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 96        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 97
 98    def public_key(self) -> CertificatePublicKeyTypes:
 99        return self._cert.public_key()
100
101    def fingerprint(self) -> bytes:
102        return self._cert.fingerprint(hashes.SHA256())
103
104    @property
105    def issuer(self) -> list[tuple[str, str]]:
106        return _name_to_keyval(self._cert.issuer)
107
108    @property
109    def notbefore(self) -> datetime.datetime:
110        try:
111            # type definitions haven't caught up with new API yet.
112            return self._cert.not_valid_before_utc  # type: ignore
113        except AttributeError:  # pragma: no cover
114            # cryptography < 42.0
115            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
116
117    @property
118    def notafter(self) -> datetime.datetime:
119        try:
120            return self._cert.not_valid_after_utc  # type: ignore
121        except AttributeError:  # pragma: no cover
122            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
123
124    def has_expired(self) -> bool:
125        if sys.version_info < (3, 11):  # pragma: no cover
126            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
127        return datetime.datetime.now(datetime.UTC) > self.notafter
128
129    @property
130    def subject(self) -> list[tuple[str, str]]:
131        return _name_to_keyval(self._cert.subject)
132
133    @property
134    def serial(self) -> int:
135        return self._cert.serial_number
136
137    @property
138    def is_ca(self) -> bool:
139        constraints: x509.BasicConstraints
140        try:
141            constraints = self._cert.extensions.get_extension_for_class(
142                x509.BasicConstraints
143            ).value
144            return constraints.ca
145        except x509.ExtensionNotFound:
146            return False
147
148    @property
149    def keyinfo(self) -> tuple[str, int]:
150        public_key = self._cert.public_key()
151        if isinstance(public_key, rsa.RSAPublicKey):
152            return "RSA", public_key.key_size
153        if isinstance(public_key, dsa.DSAPublicKey):
154            return "DSA", public_key.key_size
155        if isinstance(public_key, ec.EllipticCurvePublicKey):
156            return f"EC ({public_key.curve.name})", public_key.key_size
157        return (
158            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
159            getattr(public_key, "key_size", -1),
160        )  # pragma: no cover
161
162    @property
163    def cn(self) -> str | None:
164        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
165        if attrs:
166            return cast(str, attrs[0].value)
167        return None
168
169    @property
170    def organization(self) -> str | None:
171        attrs = self._cert.subject.get_attributes_for_oid(
172            x509.NameOID.ORGANIZATION_NAME
173        )
174        if attrs:
175            return cast(str, attrs[0].value)
176        return None
177
178    @property
179    def altnames(self) -> x509.GeneralNames:
180        """
181        Get all SubjectAlternativeName DNS altnames.
182        """
183        try:
184            sans = self._cert.extensions.get_extension_for_class(
185                x509.SubjectAlternativeName
186            ).value
187        except x509.ExtensionNotFound:
188            return x509.GeneralNames([])
189        else:
190            return x509.GeneralNames(sans)
191
192
193def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]:
194    parts = []
195    for attr in name:
196        k = attr.rfc4514_string().partition("=")[0]
197        v = cast(str, attr.value)
198        parts.append((k, v))
199    return parts
200
201
202def create_ca(
203    organization: str,
204    cn: str,
205    key_size: int,
206) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]:
207    now = datetime.datetime.now()
208
209    private_key = rsa.generate_private_key(
210        public_exponent=65537,
211        key_size=key_size,
212    )  # type: ignore
213    name = x509.Name(
214        [
215            x509.NameAttribute(NameOID.COMMON_NAME, cn),
216            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
217        ]
218    )
219    builder = x509.CertificateBuilder()
220    builder = builder.serial_number(x509.random_serial_number())
221    builder = builder.subject_name(name)
222    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
223    builder = builder.not_valid_after(now + CA_EXPIRY)
224    builder = builder.issuer_name(name)
225    builder = builder.public_key(private_key.public_key())
226    builder = builder.add_extension(
227        x509.BasicConstraints(ca=True, path_length=None), critical=True
228    )
229    builder = builder.add_extension(
230        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
231    )
232    builder = builder.add_extension(
233        x509.KeyUsage(
234            digital_signature=False,
235            content_commitment=False,
236            key_encipherment=False,
237            data_encipherment=False,
238            key_agreement=False,
239            key_cert_sign=True,
240            crl_sign=True,
241            encipher_only=False,
242            decipher_only=False,
243        ),
244        critical=True,
245    )
246    builder = builder.add_extension(
247        x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()),
248        critical=False,
249    )
250    cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256())  # type: ignore
251    return private_key, cert
252
253
254def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames:
255    """
256    SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames.
257    This function converts the old format to the new one.
258    """
259    if isinstance(sans, x509.GeneralNames):
260        return sans
261    elif (
262        isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str)
263    ):  # pragma: no cover
264        warnings.warn(
265            "Passing SANs as a list of strings is deprecated.",
266            DeprecationWarning,
267            stacklevel=2,
268        )
269
270        ss: list[x509.GeneralName] = []
271        for x in cast(list[str], sans):
272            try:
273                ip = ipaddress.ip_address(x)
274            except ValueError:
275                x = x.encode("idna").decode()
276                ss.append(x509.DNSName(x))
277            else:
278                ss.append(x509.IPAddress(ip))
279        return x509.GeneralNames(ss)
280    else:
281        return x509.GeneralNames(sans)
282
283
284def dummy_cert(
285    privkey: rsa.RSAPrivateKey,
286    cacert: x509.Certificate,
287    commonname: str | None,
288    sans: Iterable[x509.GeneralName],
289    organization: str | None = None,
290) -> Cert:
291    """
292    Generates a dummy certificate.
293
294    privkey: CA private key
295    cacert: CA certificate
296    commonname: Common name for the generated certificate.
297    sans: A list of Subject Alternate Names.
298    organization: Organization name for the generated certificate.
299
300    Returns cert if operation succeeded, None if not.
301    """
302    builder = x509.CertificateBuilder()
303    builder = builder.issuer_name(cacert.subject)
304    builder = builder.add_extension(
305        x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
306    )
307    builder = builder.public_key(cacert.public_key())
308
309    now = datetime.datetime.now()
310    builder = builder.not_valid_before(now - datetime.timedelta(days=2))
311    builder = builder.not_valid_after(now + CERT_EXPIRY)
312
313    subject = []
314    is_valid_commonname = commonname is not None and len(commonname) < 64
315    if is_valid_commonname:
316        assert commonname is not None
317        subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname))
318    if organization is not None:
319        assert organization is not None
320        subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization))
321    builder = builder.subject_name(x509.Name(subject))
322    builder = builder.serial_number(x509.random_serial_number())
323
324    # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty.
325    builder = builder.add_extension(
326        x509.SubjectAlternativeName(_fix_legacy_sans(sans)),
327        critical=not is_valid_commonname,
328    )
329
330    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1
331    builder = builder.add_extension(
332        x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()),
333        critical=False,
334    )
335    # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways,
336    # see https://github.com/mitmproxy/mitmproxy/issues/6494.
337    # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states
338    # that SKI is optional for the leaf cert, so we skip that.
339
340    cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256())  # type: ignore
341    return Cert(cert)
342
343
344@dataclass(frozen=True)
345class CertStoreEntry:
346    cert: Cert
347    privatekey: rsa.RSAPrivateKey
348    chain_file: Path | None
349    chain_certs: list[Cert]
350
351
352TCustomCertId = str  # manually provided certs (e.g. mitmproxy's --certs)
353TGeneratedCertId = tuple[Optional[str], x509.GeneralNames]  # (common_name, sans)
354TCertId = Union[TCustomCertId, TGeneratedCertId]
355
356DHParams = NewType("DHParams", bytes)
357
358
359class CertStore:
360    """
361    Implements an in-memory certificate store.
362    """
363
364    STORE_CAP = 100
365    certs: dict[TCertId, CertStoreEntry]
366    expire_queue: list[CertStoreEntry]
367
368    def __init__(
369        self,
370        default_privatekey: rsa.RSAPrivateKey,
371        default_ca: Cert,
372        default_chain_file: Path | None,
373        dhparams: DHParams,
374    ):
375        self.default_privatekey = default_privatekey
376        self.default_ca = default_ca
377        self.default_chain_file = default_chain_file
378        self.default_chain_certs = (
379            x509.load_pem_x509_certificates(self.default_chain_file.read_bytes())
380            if self.default_chain_file
381            else [default_ca]
382        )
383        self.dhparams = dhparams
384        self.certs = {}
385        self.expire_queue = []
386
387    def expire(self, entry: CertStoreEntry) -> None:
388        self.expire_queue.append(entry)
389        if len(self.expire_queue) > self.STORE_CAP:
390            d = self.expire_queue.pop(0)
391            self.certs = {k: v for k, v in self.certs.items() if v != d}
392
393    @staticmethod
394    def load_dhparam(path: Path) -> DHParams:
395        # mitmproxy<=0.10 doesn't generate a dhparam file.
396        # Create it now if necessary.
397        if not path.exists():
398            path.write_bytes(DEFAULT_DHPARAM)
399
400        # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's
401        # expected format.
402        bio = OpenSSL.SSL._lib.BIO_new_file(  # type: ignore
403            str(path).encode(sys.getfilesystemencoding()), b"r"
404        )
405        if bio != OpenSSL.SSL._ffi.NULL:  # type: ignore
406            bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free)  # type: ignore
407            dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams(  # type: ignore
408                bio,
409                OpenSSL.SSL._ffi.NULL,  # type: ignore
410                OpenSSL.SSL._ffi.NULL,  # type: ignore
411                OpenSSL.SSL._ffi.NULL,  # type: ignore
412            )
413            dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free)  # type: ignore
414            return dh
415        raise RuntimeError("Error loading DH Params.")  # pragma: no cover
416
417    @classmethod
418    def from_store(
419        cls,
420        path: Path | str,
421        basename: str,
422        key_size: int,
423        passphrase: bytes | None = None,
424    ) -> "CertStore":
425        path = Path(path)
426        ca_file = path / f"{basename}-ca.pem"
427        dhparam_file = path / f"{basename}-dhparam.pem"
428        if not ca_file.exists():
429            cls.create_store(path, basename, key_size)
430        return cls.from_files(ca_file, dhparam_file, passphrase)
431
432    @classmethod
433    def from_files(
434        cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None
435    ) -> "CertStore":
436        raw = ca_file.read_bytes()
437        key = load_pem_private_key(raw, passphrase)
438        dh = cls.load_dhparam(dhparam_file)
439        certs = x509.load_pem_x509_certificates(raw)
440        ca = Cert(certs[0])
441        if len(certs) > 1:
442            chain_file: Path | None = ca_file
443        else:
444            chain_file = None
445        return cls(key, ca, chain_file, dh)
446
447    @staticmethod
448    @contextlib.contextmanager
449    def umask_secret():
450        """
451        Context to temporarily set umask to its original value bitor 0o77.
452        Useful when writing private keys to disk so that only the owner
453        will be able to read them.
454        """
455        original_umask = os.umask(0)
456        os.umask(original_umask | 0o77)
457        try:
458            yield
459        finally:
460            os.umask(original_umask)
461
462    @staticmethod
463    def create_store(
464        path: Path, basename: str, key_size: int, organization=None, cn=None
465    ) -> None:
466        path.mkdir(parents=True, exist_ok=True)
467
468        organization = organization or basename
469        cn = cn or basename
470
471        key: rsa.RSAPrivateKeyWithSerialization
472        ca: x509.Certificate
473        key, ca = create_ca(organization=organization, cn=cn, key_size=key_size)
474
475        # Dump the CA plus private key.
476        with CertStore.umask_secret():
477            # PEM format
478            (path / f"{basename}-ca.pem").write_bytes(
479                key.private_bytes(
480                    encoding=serialization.Encoding.PEM,
481                    format=serialization.PrivateFormat.TraditionalOpenSSL,
482                    encryption_algorithm=serialization.NoEncryption(),
483                )
484                + ca.public_bytes(serialization.Encoding.PEM)
485            )
486
487            # PKCS12 format for Windows devices
488            (path / f"{basename}-ca.p12").write_bytes(
489                pkcs12.serialize_key_and_certificates(  # type: ignore
490                    name=basename.encode(),
491                    key=key,
492                    cert=ca,
493                    cas=None,
494                    encryption_algorithm=serialization.NoEncryption(),
495                )
496            )
497
498        # Dump the certificate in PEM format
499        pem_cert = ca.public_bytes(serialization.Encoding.PEM)
500        (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert)
501        # Create a .cer file with the same contents for Android
502        (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert)
503
504        # Dump the certificate in PKCS12 format for Windows devices
505        (path / f"{basename}-ca-cert.p12").write_bytes(
506            pkcs12.serialize_key_and_certificates(
507                name=basename.encode(),
508                key=None,  # type: ignore
509                cert=ca,
510                cas=None,
511                encryption_algorithm=serialization.NoEncryption(),
512            )
513        )
514
515        (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM)
516
517    def add_cert_file(
518        self, spec: str, path: Path, passphrase: bytes | None = None
519    ) -> None:
520        raw = path.read_bytes()
521        cert = Cert.from_pem(raw)
522        try:
523            private_key = load_pem_private_key(raw, password=passphrase)
524        except ValueError as e:
525            private_key = self.default_privatekey
526            if cert.public_key() != private_key.public_key():
527                raise ValueError(
528                    f'Unable to find private key in "{path.absolute()}": {e}'
529                ) from e
530        else:
531            if cert.public_key() != private_key.public_key():
532                raise ValueError(
533                    f'Private and public keys in "{path.absolute()}" do not match:\n'
534                    f"{cert.public_key()=}\n"
535                    f"{private_key.public_key()=}"
536                )
537
538        try:
539            chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)]
540        except ValueError as e:
541            logger.warning(f"Failed to read certificate chain: {e}")
542            chain = [cert]
543
544        if cert.is_ca:
545            logger.warning(
546                f'"{path.absolute()}" is a certificate authority and not a leaf certificate. '
547                f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/."
548            )
549
550        self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec)
551
552    def add_cert(self, entry: CertStoreEntry, *names: str) -> None:
553        """
554        Adds a cert to the certstore. We register the CN in the cert plus
555        any SANs, and also the list of names provided as an argument.
556        """
557        if entry.cert.cn:
558            self.certs[entry.cert.cn] = entry
559        for i in entry.cert.altnames:
560            self.certs[str(i.value)] = entry
561        for i in names:
562            self.certs[i] = entry
563
564    @staticmethod
565    def asterisk_forms(dn: str | x509.GeneralName) -> list[str]:
566        """
567        Return all asterisk forms for a domain. For example, for www.example.com this will return
568        [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted.
569        """
570        if isinstance(dn, str):
571            parts = dn.split(".")
572            ret = [dn]
573            for i in range(1, len(parts)):
574                ret.append("*." + ".".join(parts[i:]))
575            return ret
576        elif isinstance(dn, x509.DNSName):
577            return CertStore.asterisk_forms(dn.value)
578        else:
579            return [str(dn.value)]
580
581    def get_cert(
582        self,
583        commonname: str | None,
584        sans: Iterable[x509.GeneralName],
585        organization: str | None = None,
586    ) -> CertStoreEntry:
587        """
588        commonname: Common name for the generated certificate. Must be a
589        valid, plain-ASCII, IDNA-encoded domain name.
590
591        sans: A list of Subject Alternate Names.
592
593        organization: Organization name for the generated certificate.
594        """
595        sans = _fix_legacy_sans(sans)
596
597        potential_keys: list[TCertId] = []
598        if commonname:
599            potential_keys.extend(self.asterisk_forms(commonname))
600        for s in sans:
601            potential_keys.extend(self.asterisk_forms(s))
602        potential_keys.append("*")
603        potential_keys.append((commonname, sans))
604
605        name = next(filter(lambda key: key in self.certs, potential_keys), None)
606        if name:
607            entry = self.certs[name]
608        else:
609            entry = CertStoreEntry(
610                cert=dummy_cert(
611                    self.default_privatekey,
612                    self.default_ca._cert,
613                    commonname,
614                    sans,
615                    organization,
616                ),
617                privatekey=self.default_privatekey,
618                chain_file=self.default_chain_file,
619                chain_certs=self.default_chain_certs,
620            )
621            self.certs[(commonname, sans)] = entry
622            self.expire(entry)
623
624        return entry
625
626
627def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey:
628    """
629    like cryptography's load_pem_private_key, but silently falls back to not using a password
630    if the private key is unencrypted.
631    """
632    try:
633        return serialization.load_pem_private_key(data, password)  # type: ignore
634    except TypeError:
635        if password is not None:
636            return load_pem_private_key(data, None)
637        raise
class Cert(mitmproxy.coretypes.serializable.Serializable):
 55class Cert(serializable.Serializable):
 56    """Representation of a (TLS) certificate."""
 57
 58    _cert: x509.Certificate
 59
 60    def __init__(self, cert: x509.Certificate):
 61        assert isinstance(cert, x509.Certificate)
 62        self._cert = cert
 63
 64    def __eq__(self, other):
 65        return self.fingerprint() == other.fingerprint()
 66
 67    def __repr__(self):
 68        altnames = [str(x.value) for x in self.altnames]
 69        return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>"
 70
 71    def __hash__(self):
 72        return self._cert.__hash__()
 73
 74    @classmethod
 75    def from_state(cls, state):
 76        return cls.from_pem(state)
 77
 78    def get_state(self):
 79        return self.to_pem()
 80
 81    def set_state(self, state):
 82        self._cert = x509.load_pem_x509_certificate(state)
 83
 84    @classmethod
 85    def from_pem(cls, data: bytes) -> "Cert":
 86        cert = x509.load_pem_x509_certificate(data)  # type: ignore
 87        return cls(cert)
 88
 89    def to_pem(self) -> bytes:
 90        return self._cert.public_bytes(serialization.Encoding.PEM)
 91
 92    @classmethod
 93    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
 94        return Cert(x509.to_cryptography())
 95
 96    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
 97        return OpenSSL.crypto.X509.from_cryptography(self._cert)
 98
 99    def public_key(self) -> CertificatePublicKeyTypes:
100        return self._cert.public_key()
101
102    def fingerprint(self) -> bytes:
103        return self._cert.fingerprint(hashes.SHA256())
104
105    @property
106    def issuer(self) -> list[tuple[str, str]]:
107        return _name_to_keyval(self._cert.issuer)
108
109    @property
110    def notbefore(self) -> datetime.datetime:
111        try:
112            # type definitions haven't caught up with new API yet.
113            return self._cert.not_valid_before_utc  # type: ignore
114        except AttributeError:  # pragma: no cover
115            # cryptography < 42.0
116            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
117
118    @property
119    def notafter(self) -> datetime.datetime:
120        try:
121            return self._cert.not_valid_after_utc  # type: ignore
122        except AttributeError:  # pragma: no cover
123            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
124
125    def has_expired(self) -> bool:
126        if sys.version_info < (3, 11):  # pragma: no cover
127            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
128        return datetime.datetime.now(datetime.UTC) > self.notafter
129
130    @property
131    def subject(self) -> list[tuple[str, str]]:
132        return _name_to_keyval(self._cert.subject)
133
134    @property
135    def serial(self) -> int:
136        return self._cert.serial_number
137
138    @property
139    def is_ca(self) -> bool:
140        constraints: x509.BasicConstraints
141        try:
142            constraints = self._cert.extensions.get_extension_for_class(
143                x509.BasicConstraints
144            ).value
145            return constraints.ca
146        except x509.ExtensionNotFound:
147            return False
148
149    @property
150    def keyinfo(self) -> tuple[str, int]:
151        public_key = self._cert.public_key()
152        if isinstance(public_key, rsa.RSAPublicKey):
153            return "RSA", public_key.key_size
154        if isinstance(public_key, dsa.DSAPublicKey):
155            return "DSA", public_key.key_size
156        if isinstance(public_key, ec.EllipticCurvePublicKey):
157            return f"EC ({public_key.curve.name})", public_key.key_size
158        return (
159            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
160            getattr(public_key, "key_size", -1),
161        )  # pragma: no cover
162
163    @property
164    def cn(self) -> str | None:
165        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
166        if attrs:
167            return cast(str, attrs[0].value)
168        return None
169
170    @property
171    def organization(self) -> str | None:
172        attrs = self._cert.subject.get_attributes_for_oid(
173            x509.NameOID.ORGANIZATION_NAME
174        )
175        if attrs:
176            return cast(str, attrs[0].value)
177        return None
178
179    @property
180    def altnames(self) -> x509.GeneralNames:
181        """
182        Get all SubjectAlternativeName DNS altnames.
183        """
184        try:
185            sans = self._cert.extensions.get_extension_for_class(
186                x509.SubjectAlternativeName
187            ).value
188        except x509.ExtensionNotFound:
189            return x509.GeneralNames([])
190        else:
191            return x509.GeneralNames(sans)

Representation of a (TLS) certificate.

Cert(cert: cryptography.x509.base.Certificate)
60    def __init__(self, cert: x509.Certificate):
61        assert isinstance(cert, x509.Certificate)
62        self._cert = cert
@classmethod
def from_pem(cls, data: bytes) -> Cert:
84    @classmethod
85    def from_pem(cls, data: bytes) -> "Cert":
86        cert = x509.load_pem_x509_certificate(data)  # type: ignore
87        return cls(cert)
def to_pem(self) -> bytes:
89    def to_pem(self) -> bytes:
90        return self._cert.public_bytes(serialization.Encoding.PEM)
@classmethod
def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> Cert:
92    @classmethod
93    def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert":
94        return Cert(x509.to_cryptography())
def to_pyopenssl(self) -> OpenSSL.crypto.X509:
96    def to_pyopenssl(self) -> OpenSSL.crypto.X509:
97        return OpenSSL.crypto.X509.from_cryptography(self._cert)
def public_key( self) -> Union[cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey]:
 99    def public_key(self) -> CertificatePublicKeyTypes:
100        return self._cert.public_key()
def fingerprint(self) -> bytes:
102    def fingerprint(self) -> bytes:
103        return self._cert.fingerprint(hashes.SHA256())
issuer: list[tuple[str, str]]
105    @property
106    def issuer(self) -> list[tuple[str, str]]:
107        return _name_to_keyval(self._cert.issuer)
notbefore: datetime.datetime
109    @property
110    def notbefore(self) -> datetime.datetime:
111        try:
112            # type definitions haven't caught up with new API yet.
113            return self._cert.not_valid_before_utc  # type: ignore
114        except AttributeError:  # pragma: no cover
115            # cryptography < 42.0
116            return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
notafter: datetime.datetime
118    @property
119    def notafter(self) -> datetime.datetime:
120        try:
121            return self._cert.not_valid_after_utc  # type: ignore
122        except AttributeError:  # pragma: no cover
123            return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc)
def has_expired(self) -> bool:
125    def has_expired(self) -> bool:
126        if sys.version_info < (3, 11):  # pragma: no cover
127            return datetime.datetime.now(datetime.timezone.utc) > self.notafter
128        return datetime.datetime.now(datetime.UTC) > self.notafter
subject: list[tuple[str, str]]
130    @property
131    def subject(self) -> list[tuple[str, str]]:
132        return _name_to_keyval(self._cert.subject)
serial: int
134    @property
135    def serial(self) -> int:
136        return self._cert.serial_number
is_ca: bool
138    @property
139    def is_ca(self) -> bool:
140        constraints: x509.BasicConstraints
141        try:
142            constraints = self._cert.extensions.get_extension_for_class(
143                x509.BasicConstraints
144            ).value
145            return constraints.ca
146        except x509.ExtensionNotFound:
147            return False
keyinfo: tuple[str, int]
149    @property
150    def keyinfo(self) -> tuple[str, int]:
151        public_key = self._cert.public_key()
152        if isinstance(public_key, rsa.RSAPublicKey):
153            return "RSA", public_key.key_size
154        if isinstance(public_key, dsa.DSAPublicKey):
155            return "DSA", public_key.key_size
156        if isinstance(public_key, ec.EllipticCurvePublicKey):
157            return f"EC ({public_key.curve.name})", public_key.key_size
158        return (
159            public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""),
160            getattr(public_key, "key_size", -1),
161        )  # pragma: no cover
cn: str | None
163    @property
164    def cn(self) -> str | None:
165        attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
166        if attrs:
167            return cast(str, attrs[0].value)
168        return None
organization: str | None
170    @property
171    def organization(self) -> str | None:
172        attrs = self._cert.subject.get_attributes_for_oid(
173            x509.NameOID.ORGANIZATION_NAME
174        )
175        if attrs:
176            return cast(str, attrs[0].value)
177        return None
altnames: cryptography.x509.extensions.GeneralNames
179    @property
180    def altnames(self) -> x509.GeneralNames:
181        """
182        Get all SubjectAlternativeName DNS altnames.
183        """
184        try:
185            sans = self._cert.extensions.get_extension_for_class(
186                x509.SubjectAlternativeName
187            ).value
188        except x509.ExtensionNotFound:
189            return x509.GeneralNames([])
190        else:
191            return x509.GeneralNames(sans)

Get all SubjectAlternativeName DNS altnames.