mitmproxy.certs
1import contextlib 2import datetime 3import ipaddress 4import logging 5import os 6import sys 7import warnings 8from collections.abc import Iterable 9from dataclasses import dataclass 10from pathlib import Path 11from typing import cast 12from typing import NewType 13from typing import Optional 14from typing import Union 15 16import OpenSSL 17from cryptography import x509 18from cryptography.hazmat.primitives import hashes 19from cryptography.hazmat.primitives import serialization 20from cryptography.hazmat.primitives.asymmetric import dsa 21from cryptography.hazmat.primitives.asymmetric import ec 22from cryptography.hazmat.primitives.asymmetric import rsa 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes 24from cryptography.hazmat.primitives.serialization import pkcs12 25from cryptography.x509 import ExtendedKeyUsageOID 26from cryptography.x509 import NameOID 27 28from mitmproxy.coretypes import serializable 29 30logger = logging.getLogger(__name__) 31 32# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815 33CA_EXPIRY = datetime.timedelta(days=10 * 365) 34CERT_EXPIRY = datetime.timedelta(days=365) 35 36# Generated with "openssl dhparam". It's too slow to generate this on startup. 37DEFAULT_DHPARAM = b""" 38-----BEGIN DH PARAMETERS----- 39MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3 40O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv 41j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ 42Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB 43chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC 44ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq 45o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX 46IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv 47A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8 486mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I 49rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI= 50-----END DH PARAMETERS----- 51""" 52 53 54class Cert(serializable.Serializable): 55 """Representation of a (TLS) certificate.""" 56 57 _cert: x509.Certificate 58 59 def __init__(self, cert: x509.Certificate): 60 assert isinstance(cert, x509.Certificate) 61 self._cert = cert 62 63 def __eq__(self, other): 64 return self.fingerprint() == other.fingerprint() 65 66 def __repr__(self): 67 altnames = [str(x.value) for x in self.altnames] 68 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 69 70 def __hash__(self): 71 return self._cert.__hash__() 72 73 @classmethod 74 def from_state(cls, state): 75 return cls.from_pem(state) 76 77 def get_state(self): 78 return self.to_pem() 79 80 def set_state(self, state): 81 self._cert = x509.load_pem_x509_certificate(state) 82 83 @classmethod 84 def from_pem(cls, data: bytes) -> "Cert": 85 cert = x509.load_pem_x509_certificate(data) # type: ignore 86 return cls(cert) 87 88 def to_pem(self) -> bytes: 89 return self._cert.public_bytes(serialization.Encoding.PEM) 90 91 @classmethod 92 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 93 return Cert(x509.to_cryptography()) 94 95 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 96 return OpenSSL.crypto.X509.from_cryptography(self._cert) 97 98 def public_key(self) -> CertificatePublicKeyTypes: 99 return self._cert.public_key() 100 101 def fingerprint(self) -> bytes: 102 return self._cert.fingerprint(hashes.SHA256()) 103 104 @property 105 def issuer(self) -> list[tuple[str, str]]: 106 return _name_to_keyval(self._cert.issuer) 107 108 @property 109 def notbefore(self) -> datetime.datetime: 110 try: 111 # type definitions haven't caught up with new API yet. 112 return self._cert.not_valid_before_utc # type: ignore 113 except AttributeError: # pragma: no cover 114 # cryptography < 42.0 115 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 116 117 @property 118 def notafter(self) -> datetime.datetime: 119 try: 120 return self._cert.not_valid_after_utc # type: ignore 121 except AttributeError: # pragma: no cover 122 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 123 124 def has_expired(self) -> bool: 125 if sys.version_info < (3, 11): # pragma: no cover 126 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 127 return datetime.datetime.now(datetime.UTC) > self.notafter 128 129 @property 130 def subject(self) -> list[tuple[str, str]]: 131 return _name_to_keyval(self._cert.subject) 132 133 @property 134 def serial(self) -> int: 135 return self._cert.serial_number 136 137 @property 138 def is_ca(self) -> bool: 139 constraints: x509.BasicConstraints 140 try: 141 constraints = self._cert.extensions.get_extension_for_class( 142 x509.BasicConstraints 143 ).value 144 return constraints.ca 145 except x509.ExtensionNotFound: 146 return False 147 148 @property 149 def keyinfo(self) -> tuple[str, int]: 150 public_key = self._cert.public_key() 151 if isinstance(public_key, rsa.RSAPublicKey): 152 return "RSA", public_key.key_size 153 if isinstance(public_key, dsa.DSAPublicKey): 154 return "DSA", public_key.key_size 155 if isinstance(public_key, ec.EllipticCurvePublicKey): 156 return f"EC ({public_key.curve.name})", public_key.key_size 157 return ( 158 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 159 getattr(public_key, "key_size", -1), 160 ) # pragma: no cover 161 162 @property 163 def cn(self) -> str | None: 164 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 165 if attrs: 166 return cast(str, attrs[0].value) 167 return None 168 169 @property 170 def organization(self) -> str | None: 171 attrs = self._cert.subject.get_attributes_for_oid( 172 x509.NameOID.ORGANIZATION_NAME 173 ) 174 if attrs: 175 return cast(str, attrs[0].value) 176 return None 177 178 @property 179 def altnames(self) -> x509.GeneralNames: 180 """ 181 Get all SubjectAlternativeName DNS altnames. 182 """ 183 try: 184 sans = self._cert.extensions.get_extension_for_class( 185 x509.SubjectAlternativeName 186 ).value 187 except x509.ExtensionNotFound: 188 return x509.GeneralNames([]) 189 else: 190 return x509.GeneralNames(sans) 191 192 193def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: 194 parts = [] 195 for attr in name: 196 k = attr.rfc4514_string().partition("=")[0] 197 v = cast(str, attr.value) 198 parts.append((k, v)) 199 return parts 200 201 202def create_ca( 203 organization: str, 204 cn: str, 205 key_size: int, 206) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]: 207 now = datetime.datetime.now() 208 209 private_key = rsa.generate_private_key( 210 public_exponent=65537, 211 key_size=key_size, 212 ) # type: ignore 213 name = x509.Name( 214 [ 215 x509.NameAttribute(NameOID.COMMON_NAME, cn), 216 x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), 217 ] 218 ) 219 builder = x509.CertificateBuilder() 220 builder = builder.serial_number(x509.random_serial_number()) 221 builder = builder.subject_name(name) 222 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 223 builder = builder.not_valid_after(now + CA_EXPIRY) 224 builder = builder.issuer_name(name) 225 builder = builder.public_key(private_key.public_key()) 226 builder = builder.add_extension( 227 x509.BasicConstraints(ca=True, path_length=None), critical=True 228 ) 229 builder = builder.add_extension( 230 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 231 ) 232 builder = builder.add_extension( 233 x509.KeyUsage( 234 digital_signature=False, 235 content_commitment=False, 236 key_encipherment=False, 237 data_encipherment=False, 238 key_agreement=False, 239 key_cert_sign=True, 240 crl_sign=True, 241 encipher_only=False, 242 decipher_only=False, 243 ), 244 critical=True, 245 ) 246 builder = builder.add_extension( 247 x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), 248 critical=False, 249 ) 250 cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore 251 return private_key, cert 252 253 254def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames: 255 """ 256 SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames. 257 This function converts the old format to the new one. 258 """ 259 if isinstance(sans, x509.GeneralNames): 260 return sans 261 elif ( 262 isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str) 263 ): # pragma: no cover 264 warnings.warn( 265 "Passing SANs as a list of strings is deprecated.", 266 DeprecationWarning, 267 stacklevel=2, 268 ) 269 270 ss: list[x509.GeneralName] = [] 271 for x in cast(list[str], sans): 272 try: 273 ip = ipaddress.ip_address(x) 274 except ValueError: 275 x = x.encode("idna").decode() 276 ss.append(x509.DNSName(x)) 277 else: 278 ss.append(x509.IPAddress(ip)) 279 return x509.GeneralNames(ss) 280 else: 281 return x509.GeneralNames(sans) 282 283 284def dummy_cert( 285 privkey: rsa.RSAPrivateKey, 286 cacert: x509.Certificate, 287 commonname: str | None, 288 sans: Iterable[x509.GeneralName], 289 organization: str | None = None, 290) -> Cert: 291 """ 292 Generates a dummy certificate. 293 294 privkey: CA private key 295 cacert: CA certificate 296 commonname: Common name for the generated certificate. 297 sans: A list of Subject Alternate Names. 298 organization: Organization name for the generated certificate. 299 300 Returns cert if operation succeeded, None if not. 301 """ 302 builder = x509.CertificateBuilder() 303 builder = builder.issuer_name(cacert.subject) 304 builder = builder.add_extension( 305 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 306 ) 307 builder = builder.public_key(cacert.public_key()) 308 309 now = datetime.datetime.now() 310 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 311 builder = builder.not_valid_after(now + CERT_EXPIRY) 312 313 subject = [] 314 is_valid_commonname = commonname is not None and len(commonname) < 64 315 if is_valid_commonname: 316 assert commonname is not None 317 subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname)) 318 if organization is not None: 319 assert organization is not None 320 subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) 321 builder = builder.subject_name(x509.Name(subject)) 322 builder = builder.serial_number(x509.random_serial_number()) 323 324 # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. 325 builder = builder.add_extension( 326 x509.SubjectAlternativeName(_fix_legacy_sans(sans)), 327 critical=not is_valid_commonname, 328 ) 329 330 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1 331 builder = builder.add_extension( 332 x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()), 333 critical=False, 334 ) 335 # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways, 336 # see https://github.com/mitmproxy/mitmproxy/issues/6494. 337 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states 338 # that SKI is optional for the leaf cert, so we skip that. 339 340 cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) # type: ignore 341 return Cert(cert) 342 343 344@dataclass(frozen=True) 345class CertStoreEntry: 346 cert: Cert 347 privatekey: rsa.RSAPrivateKey 348 chain_file: Path | None 349 chain_certs: list[Cert] 350 351 352TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) 353TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans) 354TCertId = Union[TCustomCertId, TGeneratedCertId] 355 356DHParams = NewType("DHParams", bytes) 357 358 359class CertStore: 360 """ 361 Implements an in-memory certificate store. 362 """ 363 364 STORE_CAP = 100 365 default_privatekey: rsa.RSAPrivateKey 366 default_ca: Cert 367 default_chain_file: Path | None 368 default_chain_certs: list[Cert] 369 dhparams: DHParams 370 certs: dict[TCertId, CertStoreEntry] 371 expire_queue: list[CertStoreEntry] 372 373 def __init__( 374 self, 375 default_privatekey: rsa.RSAPrivateKey, 376 default_ca: Cert, 377 default_chain_file: Path | None, 378 dhparams: DHParams, 379 ): 380 self.default_privatekey = default_privatekey 381 self.default_ca = default_ca 382 self.default_chain_file = default_chain_file 383 self.default_chain_certs = ( 384 [ 385 Cert(c) 386 for c in x509.load_pem_x509_certificates( 387 self.default_chain_file.read_bytes() 388 ) 389 ] 390 if self.default_chain_file 391 else [default_ca] 392 ) 393 self.dhparams = dhparams 394 self.certs = {} 395 self.expire_queue = [] 396 397 def expire(self, entry: CertStoreEntry) -> None: 398 self.expire_queue.append(entry) 399 if len(self.expire_queue) > self.STORE_CAP: 400 d = self.expire_queue.pop(0) 401 self.certs = {k: v for k, v in self.certs.items() if v != d} 402 403 @staticmethod 404 def load_dhparam(path: Path) -> DHParams: 405 # mitmproxy<=0.10 doesn't generate a dhparam file. 406 # Create it now if necessary. 407 if not path.exists(): 408 path.write_bytes(DEFAULT_DHPARAM) 409 410 # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's 411 # expected format. 412 bio = OpenSSL.SSL._lib.BIO_new_file( # type: ignore 413 str(path).encode(sys.getfilesystemencoding()), b"r" 414 ) 415 if bio != OpenSSL.SSL._ffi.NULL: # type: ignore 416 bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore 417 dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore 418 bio, 419 OpenSSL.SSL._ffi.NULL, # type: ignore 420 OpenSSL.SSL._ffi.NULL, # type: ignore 421 OpenSSL.SSL._ffi.NULL, # type: ignore 422 ) 423 dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free) # type: ignore 424 return dh 425 raise RuntimeError("Error loading DH Params.") # pragma: no cover 426 427 @classmethod 428 def from_store( 429 cls, 430 path: Path | str, 431 basename: str, 432 key_size: int, 433 passphrase: bytes | None = None, 434 ) -> "CertStore": 435 path = Path(path) 436 ca_file = path / f"{basename}-ca.pem" 437 dhparam_file = path / f"{basename}-dhparam.pem" 438 if not ca_file.exists(): 439 cls.create_store(path, basename, key_size) 440 return cls.from_files(ca_file, dhparam_file, passphrase) 441 442 @classmethod 443 def from_files( 444 cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None 445 ) -> "CertStore": 446 raw = ca_file.read_bytes() 447 key = load_pem_private_key(raw, passphrase) 448 dh = cls.load_dhparam(dhparam_file) 449 certs = x509.load_pem_x509_certificates(raw) 450 ca = Cert(certs[0]) 451 if len(certs) > 1: 452 chain_file: Path | None = ca_file 453 else: 454 chain_file = None 455 return cls(key, ca, chain_file, dh) 456 457 @staticmethod 458 @contextlib.contextmanager 459 def umask_secret(): 460 """ 461 Context to temporarily set umask to its original value bitor 0o77. 462 Useful when writing private keys to disk so that only the owner 463 will be able to read them. 464 """ 465 original_umask = os.umask(0) 466 os.umask(original_umask | 0o77) 467 try: 468 yield 469 finally: 470 os.umask(original_umask) 471 472 @staticmethod 473 def create_store( 474 path: Path, basename: str, key_size: int, organization=None, cn=None 475 ) -> None: 476 path.mkdir(parents=True, exist_ok=True) 477 478 organization = organization or basename 479 cn = cn or basename 480 481 key: rsa.RSAPrivateKeyWithSerialization 482 ca: x509.Certificate 483 key, ca = create_ca(organization=organization, cn=cn, key_size=key_size) 484 485 # Dump the CA plus private key. 486 with CertStore.umask_secret(): 487 # PEM format 488 (path / f"{basename}-ca.pem").write_bytes( 489 key.private_bytes( 490 encoding=serialization.Encoding.PEM, 491 format=serialization.PrivateFormat.TraditionalOpenSSL, 492 encryption_algorithm=serialization.NoEncryption(), 493 ) 494 + ca.public_bytes(serialization.Encoding.PEM) 495 ) 496 497 # PKCS12 format for Windows devices 498 (path / f"{basename}-ca.p12").write_bytes( 499 pkcs12.serialize_key_and_certificates( # type: ignore 500 name=basename.encode(), 501 key=key, 502 cert=ca, 503 cas=None, 504 encryption_algorithm=serialization.NoEncryption(), 505 ) 506 ) 507 508 # Dump the certificate in PEM format 509 pem_cert = ca.public_bytes(serialization.Encoding.PEM) 510 (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert) 511 # Create a .cer file with the same contents for Android 512 (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert) 513 514 # Dump the certificate in PKCS12 format for Windows devices 515 (path / f"{basename}-ca-cert.p12").write_bytes( 516 pkcs12.serialize_key_and_certificates( 517 name=basename.encode(), 518 key=None, # type: ignore 519 cert=ca, 520 cas=None, 521 encryption_algorithm=serialization.NoEncryption(), 522 ) 523 ) 524 525 (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM) 526 527 def add_cert_file( 528 self, spec: str, path: Path, passphrase: bytes | None = None 529 ) -> None: 530 raw = path.read_bytes() 531 cert = Cert.from_pem(raw) 532 try: 533 private_key = load_pem_private_key(raw, password=passphrase) 534 except ValueError as e: 535 private_key = self.default_privatekey 536 if cert.public_key() != private_key.public_key(): 537 raise ValueError( 538 f'Unable to find private key in "{path.absolute()}": {e}' 539 ) from e 540 else: 541 if cert.public_key() != private_key.public_key(): 542 raise ValueError( 543 f'Private and public keys in "{path.absolute()}" do not match:\n' 544 f"{cert.public_key()=}\n" 545 f"{private_key.public_key()=}" 546 ) 547 548 try: 549 chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)] 550 except ValueError as e: 551 logger.warning(f"Failed to read certificate chain: {e}") 552 chain = [cert] 553 554 if cert.is_ca: 555 logger.warning( 556 f'"{path.absolute()}" is a certificate authority and not a leaf certificate. ' 557 f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/." 558 ) 559 560 self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec) 561 562 def add_cert(self, entry: CertStoreEntry, *names: str) -> None: 563 """ 564 Adds a cert to the certstore. We register the CN in the cert plus 565 any SANs, and also the list of names provided as an argument. 566 """ 567 if entry.cert.cn: 568 self.certs[entry.cert.cn] = entry 569 for i in entry.cert.altnames: 570 self.certs[str(i.value)] = entry 571 for i in names: 572 self.certs[i] = entry 573 574 @staticmethod 575 def asterisk_forms(dn: str | x509.GeneralName) -> list[str]: 576 """ 577 Return all asterisk forms for a domain. For example, for www.example.com this will return 578 [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. 579 """ 580 if isinstance(dn, str): 581 parts = dn.split(".") 582 ret = [dn] 583 for i in range(1, len(parts)): 584 ret.append("*." + ".".join(parts[i:])) 585 return ret 586 elif isinstance(dn, x509.DNSName): 587 return CertStore.asterisk_forms(dn.value) 588 else: 589 return [str(dn.value)] 590 591 def get_cert( 592 self, 593 commonname: str | None, 594 sans: Iterable[x509.GeneralName], 595 organization: str | None = None, 596 ) -> CertStoreEntry: 597 """ 598 commonname: Common name for the generated certificate. Must be a 599 valid, plain-ASCII, IDNA-encoded domain name. 600 601 sans: A list of Subject Alternate Names. 602 603 organization: Organization name for the generated certificate. 604 """ 605 sans = _fix_legacy_sans(sans) 606 607 potential_keys: list[TCertId] = [] 608 if commonname: 609 potential_keys.extend(self.asterisk_forms(commonname)) 610 for s in sans: 611 potential_keys.extend(self.asterisk_forms(s)) 612 potential_keys.append("*") 613 potential_keys.append((commonname, sans)) 614 615 name = next(filter(lambda key: key in self.certs, potential_keys), None) 616 if name: 617 entry = self.certs[name] 618 else: 619 entry = CertStoreEntry( 620 cert=dummy_cert( 621 self.default_privatekey, 622 self.default_ca._cert, 623 commonname, 624 sans, 625 organization, 626 ), 627 privatekey=self.default_privatekey, 628 chain_file=self.default_chain_file, 629 chain_certs=self.default_chain_certs, 630 ) 631 self.certs[(commonname, sans)] = entry 632 self.expire(entry) 633 634 return entry 635 636 637def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey: 638 """ 639 like cryptography's load_pem_private_key, but silently falls back to not using a password 640 if the private key is unencrypted. 641 """ 642 try: 643 return serialization.load_pem_private_key(data, password) # type: ignore 644 except TypeError: 645 if password is not None: 646 return load_pem_private_key(data, None) 647 raise
class
Cert(mitmproxy.coretypes.serializable.Serializable):
55class Cert(serializable.Serializable): 56 """Representation of a (TLS) certificate.""" 57 58 _cert: x509.Certificate 59 60 def __init__(self, cert: x509.Certificate): 61 assert isinstance(cert, x509.Certificate) 62 self._cert = cert 63 64 def __eq__(self, other): 65 return self.fingerprint() == other.fingerprint() 66 67 def __repr__(self): 68 altnames = [str(x.value) for x in self.altnames] 69 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 70 71 def __hash__(self): 72 return self._cert.__hash__() 73 74 @classmethod 75 def from_state(cls, state): 76 return cls.from_pem(state) 77 78 def get_state(self): 79 return self.to_pem() 80 81 def set_state(self, state): 82 self._cert = x509.load_pem_x509_certificate(state) 83 84 @classmethod 85 def from_pem(cls, data: bytes) -> "Cert": 86 cert = x509.load_pem_x509_certificate(data) # type: ignore 87 return cls(cert) 88 89 def to_pem(self) -> bytes: 90 return self._cert.public_bytes(serialization.Encoding.PEM) 91 92 @classmethod 93 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 94 return Cert(x509.to_cryptography()) 95 96 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 97 return OpenSSL.crypto.X509.from_cryptography(self._cert) 98 99 def public_key(self) -> CertificatePublicKeyTypes: 100 return self._cert.public_key() 101 102 def fingerprint(self) -> bytes: 103 return self._cert.fingerprint(hashes.SHA256()) 104 105 @property 106 def issuer(self) -> list[tuple[str, str]]: 107 return _name_to_keyval(self._cert.issuer) 108 109 @property 110 def notbefore(self) -> datetime.datetime: 111 try: 112 # type definitions haven't caught up with new API yet. 113 return self._cert.not_valid_before_utc # type: ignore 114 except AttributeError: # pragma: no cover 115 # cryptography < 42.0 116 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 117 118 @property 119 def notafter(self) -> datetime.datetime: 120 try: 121 return self._cert.not_valid_after_utc # type: ignore 122 except AttributeError: # pragma: no cover 123 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 124 125 def has_expired(self) -> bool: 126 if sys.version_info < (3, 11): # pragma: no cover 127 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 128 return datetime.datetime.now(datetime.UTC) > self.notafter 129 130 @property 131 def subject(self) -> list[tuple[str, str]]: 132 return _name_to_keyval(self._cert.subject) 133 134 @property 135 def serial(self) -> int: 136 return self._cert.serial_number 137 138 @property 139 def is_ca(self) -> bool: 140 constraints: x509.BasicConstraints 141 try: 142 constraints = self._cert.extensions.get_extension_for_class( 143 x509.BasicConstraints 144 ).value 145 return constraints.ca 146 except x509.ExtensionNotFound: 147 return False 148 149 @property 150 def keyinfo(self) -> tuple[str, int]: 151 public_key = self._cert.public_key() 152 if isinstance(public_key, rsa.RSAPublicKey): 153 return "RSA", public_key.key_size 154 if isinstance(public_key, dsa.DSAPublicKey): 155 return "DSA", public_key.key_size 156 if isinstance(public_key, ec.EllipticCurvePublicKey): 157 return f"EC ({public_key.curve.name})", public_key.key_size 158 return ( 159 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 160 getattr(public_key, "key_size", -1), 161 ) # pragma: no cover 162 163 @property 164 def cn(self) -> str | None: 165 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 166 if attrs: 167 return cast(str, attrs[0].value) 168 return None 169 170 @property 171 def organization(self) -> str | None: 172 attrs = self._cert.subject.get_attributes_for_oid( 173 x509.NameOID.ORGANIZATION_NAME 174 ) 175 if attrs: 176 return cast(str, attrs[0].value) 177 return None 178 179 @property 180 def altnames(self) -> x509.GeneralNames: 181 """ 182 Get all SubjectAlternativeName DNS altnames. 183 """ 184 try: 185 sans = self._cert.extensions.get_extension_for_class( 186 x509.SubjectAlternativeName 187 ).value 188 except x509.ExtensionNotFound: 189 return x509.GeneralNames([]) 190 else: 191 return x509.GeneralNames(sans)
Representation of a (TLS) certificate.
def
public_key( self) -> Union[cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey]:
notbefore: datetime.datetime
109 @property 110 def notbefore(self) -> datetime.datetime: 111 try: 112 # type definitions haven't caught up with new API yet. 113 return self._cert.not_valid_before_utc # type: ignore 114 except AttributeError: # pragma: no cover 115 # cryptography < 42.0 116 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
keyinfo: tuple[str, int]
149 @property 150 def keyinfo(self) -> tuple[str, int]: 151 public_key = self._cert.public_key() 152 if isinstance(public_key, rsa.RSAPublicKey): 153 return "RSA", public_key.key_size 154 if isinstance(public_key, dsa.DSAPublicKey): 155 return "DSA", public_key.key_size 156 if isinstance(public_key, ec.EllipticCurvePublicKey): 157 return f"EC ({public_key.curve.name})", public_key.key_size 158 return ( 159 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 160 getattr(public_key, "key_size", -1), 161 ) # pragma: no cover
altnames: cryptography.x509.extensions.GeneralNames
179 @property 180 def altnames(self) -> x509.GeneralNames: 181 """ 182 Get all SubjectAlternativeName DNS altnames. 183 """ 184 try: 185 sans = self._cert.extensions.get_extension_for_class( 186 x509.SubjectAlternativeName 187 ).value 188 except x509.ExtensionNotFound: 189 return x509.GeneralNames([]) 190 else: 191 return x509.GeneralNames(sans)
Get all SubjectAlternativeName DNS altnames.