mitmproxy.certs
1import contextlib 2import datetime 3import ipaddress 4import logging 5import os 6import sys 7import warnings 8from collections.abc import Iterable 9from dataclasses import dataclass 10from pathlib import Path 11from typing import cast 12from typing import NewType 13from typing import Optional 14from typing import Union 15 16import OpenSSL 17from cryptography import x509 18from cryptography.hazmat.primitives import hashes 19from cryptography.hazmat.primitives import serialization 20from cryptography.hazmat.primitives.asymmetric import dsa 21from cryptography.hazmat.primitives.asymmetric import ec 22from cryptography.hazmat.primitives.asymmetric import rsa 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes 24from cryptography.hazmat.primitives.serialization import pkcs12 25from cryptography.x509 import ExtendedKeyUsageOID 26from cryptography.x509 import NameOID 27 28from mitmproxy.coretypes import serializable 29 30if sys.version_info < (3, 13): # pragma: no cover 31 from typing_extensions import deprecated 32else: 33 from warnings import deprecated 34 35logger = logging.getLogger(__name__) 36 37# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815 38CA_EXPIRY = datetime.timedelta(days=10 * 365) 39CERT_EXPIRY = datetime.timedelta(days=365) 40CRL_EXPIRY = datetime.timedelta(days=7) 41 42# Generated with "openssl dhparam". It's too slow to generate this on startup. 43DEFAULT_DHPARAM = b""" 44-----BEGIN DH PARAMETERS----- 45MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3 46O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv 47j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ 48Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB 49chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC 50ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq 51o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX 52IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv 53A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8 546mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I 55rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI= 56-----END DH PARAMETERS----- 57""" 58 59 60class Cert(serializable.Serializable): 61 """Representation of a (TLS) certificate.""" 62 63 _cert: x509.Certificate 64 65 def __init__(self, cert: x509.Certificate): 66 assert isinstance(cert, x509.Certificate) 67 self._cert = cert 68 69 def __eq__(self, other): 70 return self.fingerprint() == other.fingerprint() 71 72 def __repr__(self): 73 altnames = [str(x.value) for x in self.altnames] 74 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 75 76 def __hash__(self): 77 return self._cert.__hash__() 78 79 @classmethod 80 def from_state(cls, state): 81 return cls.from_pem(state) 82 83 def get_state(self): 84 return self.to_pem() 85 86 def set_state(self, state): 87 self._cert = x509.load_pem_x509_certificate(state) 88 89 @classmethod 90 def from_pem(cls, data: bytes) -> "Cert": 91 cert = x509.load_pem_x509_certificate(data) # type: ignore 92 return cls(cert) 93 94 def to_pem(self) -> bytes: 95 return self._cert.public_bytes(serialization.Encoding.PEM) 96 97 @classmethod 98 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 99 return Cert(x509.to_cryptography()) 100 101 @deprecated("Use `to_cryptography` instead.") 102 def to_pyopenssl(self) -> OpenSSL.crypto.X509: # pragma: no cover 103 return OpenSSL.crypto.X509.from_cryptography(self._cert) 104 105 def to_cryptography(self) -> x509.Certificate: 106 return self._cert 107 108 def public_key(self) -> CertificatePublicKeyTypes: 109 return self._cert.public_key() 110 111 def fingerprint(self) -> bytes: 112 return self._cert.fingerprint(hashes.SHA256()) 113 114 @property 115 def issuer(self) -> list[tuple[str, str]]: 116 return _name_to_keyval(self._cert.issuer) 117 118 @property 119 def notbefore(self) -> datetime.datetime: 120 try: 121 # type definitions haven't caught up with new API yet. 122 return self._cert.not_valid_before_utc # type: ignore 123 except AttributeError: # pragma: no cover 124 # cryptography < 42.0 125 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 126 127 @property 128 def notafter(self) -> datetime.datetime: 129 try: 130 return self._cert.not_valid_after_utc # type: ignore 131 except AttributeError: # pragma: no cover 132 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 133 134 def has_expired(self) -> bool: 135 if sys.version_info < (3, 11): # pragma: no cover 136 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 137 return datetime.datetime.now(datetime.UTC) > self.notafter 138 139 @property 140 def subject(self) -> list[tuple[str, str]]: 141 return _name_to_keyval(self._cert.subject) 142 143 @property 144 def serial(self) -> int: 145 return self._cert.serial_number 146 147 @property 148 def is_ca(self) -> bool: 149 constraints: x509.BasicConstraints 150 try: 151 constraints = self._cert.extensions.get_extension_for_class( 152 x509.BasicConstraints 153 ).value 154 return constraints.ca 155 except x509.ExtensionNotFound: 156 return False 157 158 @property 159 def keyinfo(self) -> tuple[str, int]: 160 public_key = self._cert.public_key() 161 if isinstance(public_key, rsa.RSAPublicKey): 162 return "RSA", public_key.key_size 163 if isinstance(public_key, dsa.DSAPublicKey): 164 return "DSA", public_key.key_size 165 if isinstance(public_key, ec.EllipticCurvePublicKey): 166 return f"EC ({public_key.curve.name})", public_key.key_size 167 return ( 168 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 169 getattr(public_key, "key_size", -1), 170 ) # pragma: no cover 171 172 @property 173 def cn(self) -> str | None: 174 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 175 if attrs: 176 return cast(str, attrs[0].value) 177 return None 178 179 @property 180 def organization(self) -> str | None: 181 attrs = self._cert.subject.get_attributes_for_oid( 182 x509.NameOID.ORGANIZATION_NAME 183 ) 184 if attrs: 185 return cast(str, attrs[0].value) 186 return None 187 188 @property 189 def altnames(self) -> x509.GeneralNames: 190 """ 191 Get all SubjectAlternativeName DNS altnames. 192 """ 193 try: 194 sans = self._cert.extensions.get_extension_for_class( 195 x509.SubjectAlternativeName 196 ).value 197 except x509.ExtensionNotFound: 198 return x509.GeneralNames([]) 199 else: 200 return x509.GeneralNames(sans) 201 202 @property 203 def crl_distribution_points(self) -> list[str]: 204 try: 205 ext = self._cert.extensions.get_extension_for_class( 206 x509.CRLDistributionPoints 207 ).value 208 except x509.ExtensionNotFound: 209 return [] 210 else: 211 return [ 212 dist_point.full_name[0].value 213 for dist_point in ext 214 if dist_point.full_name 215 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 216 ] 217 218 219def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: 220 parts = [] 221 for attr in name: 222 k = attr.rfc4514_string().partition("=")[0] 223 v = cast(str, attr.value) 224 parts.append((k, v)) 225 return parts 226 227 228def create_ca( 229 organization: str, 230 cn: str, 231 key_size: int, 232) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]: 233 now = datetime.datetime.now() 234 235 private_key = rsa.generate_private_key( 236 public_exponent=65537, 237 key_size=key_size, 238 ) # type: ignore 239 name = x509.Name( 240 [ 241 x509.NameAttribute(NameOID.COMMON_NAME, cn), 242 x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), 243 ] 244 ) 245 builder = x509.CertificateBuilder() 246 builder = builder.serial_number(x509.random_serial_number()) 247 builder = builder.subject_name(name) 248 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 249 builder = builder.not_valid_after(now + CA_EXPIRY) 250 builder = builder.issuer_name(name) 251 builder = builder.public_key(private_key.public_key()) 252 builder = builder.add_extension( 253 x509.BasicConstraints(ca=True, path_length=None), critical=True 254 ) 255 builder = builder.add_extension( 256 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 257 ) 258 builder = builder.add_extension( 259 x509.KeyUsage( 260 digital_signature=False, 261 content_commitment=False, 262 key_encipherment=False, 263 data_encipherment=False, 264 key_agreement=False, 265 key_cert_sign=True, 266 crl_sign=True, 267 encipher_only=False, 268 decipher_only=False, 269 ), 270 critical=True, 271 ) 272 builder = builder.add_extension( 273 x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), 274 critical=False, 275 ) 276 cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore 277 return private_key, cert 278 279 280def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames: 281 """ 282 SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames. 283 This function converts the old format to the new one. 284 """ 285 if isinstance(sans, x509.GeneralNames): 286 return sans 287 elif ( 288 isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str) 289 ): # pragma: no cover 290 warnings.warn( 291 "Passing SANs as a list of strings is deprecated.", 292 DeprecationWarning, 293 stacklevel=2, 294 ) 295 296 ss: list[x509.GeneralName] = [] 297 for x in cast(list[str], sans): 298 try: 299 ip = ipaddress.ip_address(x) 300 except ValueError: 301 x = x.encode("idna").decode() 302 ss.append(x509.DNSName(x)) 303 else: 304 ss.append(x509.IPAddress(ip)) 305 return x509.GeneralNames(ss) 306 else: 307 return x509.GeneralNames(cast(Iterable[x509.GeneralName], sans)) 308 309 310def dummy_cert( 311 privkey: rsa.RSAPrivateKey, 312 cacert: x509.Certificate, 313 commonname: str | None, 314 sans: Iterable[x509.GeneralName], 315 organization: str | None = None, 316 crl_url: str | None = None, 317) -> Cert: 318 """ 319 Generates a dummy certificate. 320 321 privkey: CA private key 322 cacert: CA certificate 323 commonname: Common name for the generated certificate. 324 sans: A list of Subject Alternate Names. 325 organization: Organization name for the generated certificate. 326 crl_url: URL of CRL distribution point 327 328 Returns cert if operation succeeded, None if not. 329 """ 330 builder = x509.CertificateBuilder() 331 builder = builder.issuer_name(cacert.subject) 332 builder = builder.add_extension( 333 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 334 ) 335 builder = builder.public_key(cacert.public_key()) 336 337 now = datetime.datetime.now() 338 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 339 builder = builder.not_valid_after(now + CERT_EXPIRY) 340 341 subject = [] 342 is_valid_commonname = commonname is not None and len(commonname) < 64 343 if is_valid_commonname: 344 assert commonname is not None 345 subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname)) 346 if organization is not None: 347 assert organization is not None 348 subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) 349 builder = builder.subject_name(x509.Name(subject)) 350 builder = builder.serial_number(x509.random_serial_number()) 351 352 # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. 353 builder = builder.add_extension( 354 x509.SubjectAlternativeName(_fix_legacy_sans(sans)), 355 critical=not is_valid_commonname, 356 ) 357 358 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1 359 builder = builder.add_extension( 360 x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()), # type: ignore 361 critical=False, 362 ) 363 # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways, 364 # see https://github.com/mitmproxy/mitmproxy/issues/6494. 365 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states 366 # that SKI is optional for the leaf cert, so we skip that. 367 368 if crl_url: 369 builder = builder.add_extension( 370 x509.CRLDistributionPoints( 371 [ 372 x509.DistributionPoint( 373 [x509.UniformResourceIdentifier(crl_url)], 374 relative_name=None, 375 crl_issuer=None, 376 reasons=None, 377 ) 378 ] 379 ), 380 critical=False, 381 ) 382 383 cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) # type: ignore 384 return Cert(cert) 385 386 387def dummy_crl( 388 privkey: rsa.RSAPrivateKey, 389 cacert: x509.Certificate, 390) -> bytes: 391 """ 392 Generates an empty CRL signed with the CA key 393 394 privkey: CA private key 395 cacert: CA certificate 396 397 Returns a CRL DER encoded 398 """ 399 builder = x509.CertificateRevocationListBuilder() 400 builder = builder.issuer_name(cacert.issuer) 401 402 now = datetime.datetime.now() 403 builder = builder.last_update(now - datetime.timedelta(days=2)) 404 builder = builder.next_update(now + CRL_EXPIRY) 405 406 builder = builder.add_extension(x509.CRLNumber(1000), False) # meaningless number 407 crl = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) 408 return crl.public_bytes(serialization.Encoding.DER) 409 410 411@dataclass(frozen=True) 412class CertStoreEntry: 413 cert: Cert 414 privatekey: rsa.RSAPrivateKey 415 chain_file: Path | None 416 chain_certs: list[Cert] 417 418 419TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) 420TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans) 421TCertId = Union[TCustomCertId, TGeneratedCertId] 422 423DHParams = NewType("DHParams", bytes) 424 425 426class CertStore: 427 """ 428 Implements an in-memory certificate store. 429 """ 430 431 STORE_CAP = 100 432 default_privatekey: rsa.RSAPrivateKey 433 default_ca: Cert 434 default_chain_file: Path | None 435 default_chain_certs: list[Cert] 436 dhparams: DHParams 437 certs: dict[TCertId, CertStoreEntry] 438 expire_queue: list[CertStoreEntry] 439 440 def __init__( 441 self, 442 default_privatekey: rsa.RSAPrivateKey, 443 default_ca: Cert, 444 default_chain_file: Path | None, 445 default_crl: bytes, 446 dhparams: DHParams, 447 ): 448 self.default_privatekey = default_privatekey 449 self.default_ca = default_ca 450 self.default_chain_file = default_chain_file 451 self.default_crl = default_crl 452 self.default_chain_certs = ( 453 [ 454 Cert(c) 455 for c in x509.load_pem_x509_certificates( 456 self.default_chain_file.read_bytes() 457 ) 458 ] 459 if self.default_chain_file 460 else [default_ca] 461 ) 462 self.dhparams = dhparams 463 self.certs = {} 464 self.expire_queue = [] 465 466 def expire(self, entry: CertStoreEntry) -> None: 467 self.expire_queue.append(entry) 468 if len(self.expire_queue) > self.STORE_CAP: 469 d = self.expire_queue.pop(0) 470 self.certs = {k: v for k, v in self.certs.items() if v != d} 471 472 @staticmethod 473 def load_dhparam(path: Path) -> DHParams: 474 # mitmproxy<=0.10 doesn't generate a dhparam file. 475 # Create it now if necessary. 476 if not path.exists(): 477 path.write_bytes(DEFAULT_DHPARAM) 478 479 # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's 480 # expected format. 481 bio = OpenSSL.SSL._lib.BIO_new_file( # type: ignore 482 str(path).encode(sys.getfilesystemencoding()), b"r" 483 ) 484 if bio != OpenSSL.SSL._ffi.NULL: # type: ignore 485 bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore 486 dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore 487 bio, 488 OpenSSL.SSL._ffi.NULL, # type: ignore 489 OpenSSL.SSL._ffi.NULL, # type: ignore 490 OpenSSL.SSL._ffi.NULL, # type: ignore 491 ) 492 dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free) # type: ignore 493 return dh 494 raise RuntimeError("Error loading DH Params.") # pragma: no cover 495 496 @classmethod 497 def from_store( 498 cls, 499 path: Path | str, 500 basename: str, 501 key_size: int, 502 passphrase: bytes | None = None, 503 ) -> "CertStore": 504 path = Path(path) 505 ca_file = path / f"{basename}-ca.pem" 506 dhparam_file = path / f"{basename}-dhparam.pem" 507 if not ca_file.exists(): 508 cls.create_store(path, basename, key_size) 509 return cls.from_files(ca_file, dhparam_file, passphrase) 510 511 @classmethod 512 def from_files( 513 cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None 514 ) -> "CertStore": 515 raw = ca_file.read_bytes() 516 key = load_pem_private_key(raw, passphrase) 517 dh = cls.load_dhparam(dhparam_file) 518 certs = x509.load_pem_x509_certificates(raw) 519 ca = Cert(certs[0]) 520 crl = dummy_crl(key, ca._cert) 521 if len(certs) > 1: 522 chain_file: Path | None = ca_file 523 else: 524 chain_file = None 525 return cls(key, ca, chain_file, crl, dh) 526 527 @staticmethod 528 @contextlib.contextmanager 529 def umask_secret(): 530 """ 531 Context to temporarily set umask to its original value bitor 0o77. 532 Useful when writing private keys to disk so that only the owner 533 will be able to read them. 534 """ 535 original_umask = os.umask(0) 536 os.umask(original_umask | 0o77) 537 try: 538 yield 539 finally: 540 os.umask(original_umask) 541 542 @staticmethod 543 def create_store( 544 path: Path, basename: str, key_size: int, organization=None, cn=None 545 ) -> None: 546 path.mkdir(parents=True, exist_ok=True) 547 548 organization = organization or basename 549 cn = cn or basename 550 551 key: rsa.RSAPrivateKeyWithSerialization 552 ca: x509.Certificate 553 key, ca = create_ca(organization=organization, cn=cn, key_size=key_size) 554 555 # Dump the CA plus private key. 556 with CertStore.umask_secret(): 557 # PEM format 558 (path / f"{basename}-ca.pem").write_bytes( 559 key.private_bytes( 560 encoding=serialization.Encoding.PEM, 561 format=serialization.PrivateFormat.TraditionalOpenSSL, 562 encryption_algorithm=serialization.NoEncryption(), 563 ) 564 + ca.public_bytes(serialization.Encoding.PEM) 565 ) 566 567 # PKCS12 format for Windows devices 568 (path / f"{basename}-ca.p12").write_bytes( 569 pkcs12.serialize_key_and_certificates( # type: ignore 570 name=basename.encode(), 571 key=key, 572 cert=ca, 573 cas=None, 574 encryption_algorithm=serialization.NoEncryption(), 575 ) 576 ) 577 578 # Dump the certificate in PEM format 579 pem_cert = ca.public_bytes(serialization.Encoding.PEM) 580 (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert) 581 # Create a .cer file with the same contents for Android 582 (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert) 583 584 # Dump the certificate in PKCS12 format for Windows devices 585 (path / f"{basename}-ca-cert.p12").write_bytes( 586 pkcs12.serialize_key_and_certificates( 587 name=basename.encode(), 588 key=None, # type: ignore 589 cert=ca, 590 cas=None, 591 encryption_algorithm=serialization.NoEncryption(), 592 ) 593 ) 594 595 (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM) 596 597 def add_cert_file( 598 self, spec: str, path: Path, passphrase: bytes | None = None 599 ) -> None: 600 raw = path.read_bytes() 601 cert = Cert.from_pem(raw) 602 try: 603 private_key = load_pem_private_key(raw, password=passphrase) 604 except ValueError as e: 605 private_key = self.default_privatekey 606 if cert.public_key() != private_key.public_key(): 607 raise ValueError( 608 f'Unable to find private key in "{path.absolute()}": {e}' 609 ) from e 610 else: 611 if cert.public_key() != private_key.public_key(): 612 raise ValueError( 613 f'Private and public keys in "{path.absolute()}" do not match:\n' 614 f"{cert.public_key()=}\n" 615 f"{private_key.public_key()=}" 616 ) 617 618 try: 619 chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)] 620 except ValueError as e: 621 logger.warning(f"Failed to read certificate chain: {e}") 622 chain = [cert] 623 624 if cert.is_ca: 625 logger.warning( 626 f'"{path.absolute()}" is a certificate authority and not a leaf certificate. ' 627 f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/." 628 ) 629 630 self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec) 631 632 def add_cert(self, entry: CertStoreEntry, *names: str) -> None: 633 """ 634 Adds a cert to the certstore. We register the CN in the cert plus 635 any SANs, and also the list of names provided as an argument. 636 """ 637 if entry.cert.cn: 638 self.certs[entry.cert.cn] = entry 639 for i in entry.cert.altnames: 640 self.certs[str(i.value)] = entry 641 for i in names: 642 self.certs[i] = entry 643 644 @staticmethod 645 def asterisk_forms(dn: str | x509.GeneralName) -> list[str]: 646 """ 647 Return all asterisk forms for a domain. For example, for www.example.com this will return 648 [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. 649 """ 650 if isinstance(dn, str): 651 parts = dn.split(".") 652 ret = [dn] 653 for i in range(1, len(parts)): 654 ret.append("*." + ".".join(parts[i:])) 655 return ret 656 elif isinstance(dn, x509.DNSName): 657 return CertStore.asterisk_forms(dn.value) 658 else: 659 return [str(dn.value)] 660 661 def get_cert( 662 self, 663 commonname: str | None, 664 sans: Iterable[x509.GeneralName], 665 organization: str | None = None, 666 crl_url: str | None = None, 667 ) -> CertStoreEntry: 668 """ 669 commonname: Common name for the generated certificate. Must be a 670 valid, plain-ASCII, IDNA-encoded domain name. 671 672 sans: A list of Subject Alternate Names. 673 674 organization: Organization name for the generated certificate. 675 676 crl_url: URL of CRL distribution point 677 """ 678 sans = _fix_legacy_sans(sans) 679 680 potential_keys: list[TCertId] = [] 681 if commonname: 682 potential_keys.extend(self.asterisk_forms(commonname)) 683 for s in sans: 684 potential_keys.extend(self.asterisk_forms(s)) 685 potential_keys.append("*") 686 potential_keys.append((commonname, sans)) 687 688 name = next(filter(lambda key: key in self.certs, potential_keys), None) 689 if name: 690 entry = self.certs[name] 691 else: 692 entry = CertStoreEntry( 693 cert=dummy_cert( 694 self.default_privatekey, 695 self.default_ca._cert, 696 commonname, 697 sans, 698 organization, 699 crl_url, 700 ), 701 privatekey=self.default_privatekey, 702 chain_file=self.default_chain_file, 703 chain_certs=self.default_chain_certs, 704 ) 705 self.certs[(commonname, sans)] = entry 706 self.expire(entry) 707 708 return entry 709 710 711def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey: 712 """ 713 like cryptography's load_pem_private_key, but silently falls back to not using a password 714 if the private key is unencrypted. 715 """ 716 try: 717 return serialization.load_pem_private_key(data, password) # type: ignore 718 except TypeError: 719 if password is not None: 720 return load_pem_private_key(data, None) 721 raise
class
Cert(mitmproxy.coretypes.serializable.Serializable):
61class Cert(serializable.Serializable): 62 """Representation of a (TLS) certificate.""" 63 64 _cert: x509.Certificate 65 66 def __init__(self, cert: x509.Certificate): 67 assert isinstance(cert, x509.Certificate) 68 self._cert = cert 69 70 def __eq__(self, other): 71 return self.fingerprint() == other.fingerprint() 72 73 def __repr__(self): 74 altnames = [str(x.value) for x in self.altnames] 75 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 76 77 def __hash__(self): 78 return self._cert.__hash__() 79 80 @classmethod 81 def from_state(cls, state): 82 return cls.from_pem(state) 83 84 def get_state(self): 85 return self.to_pem() 86 87 def set_state(self, state): 88 self._cert = x509.load_pem_x509_certificate(state) 89 90 @classmethod 91 def from_pem(cls, data: bytes) -> "Cert": 92 cert = x509.load_pem_x509_certificate(data) # type: ignore 93 return cls(cert) 94 95 def to_pem(self) -> bytes: 96 return self._cert.public_bytes(serialization.Encoding.PEM) 97 98 @classmethod 99 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 100 return Cert(x509.to_cryptography()) 101 102 @deprecated("Use `to_cryptography` instead.") 103 def to_pyopenssl(self) -> OpenSSL.crypto.X509: # pragma: no cover 104 return OpenSSL.crypto.X509.from_cryptography(self._cert) 105 106 def to_cryptography(self) -> x509.Certificate: 107 return self._cert 108 109 def public_key(self) -> CertificatePublicKeyTypes: 110 return self._cert.public_key() 111 112 def fingerprint(self) -> bytes: 113 return self._cert.fingerprint(hashes.SHA256()) 114 115 @property 116 def issuer(self) -> list[tuple[str, str]]: 117 return _name_to_keyval(self._cert.issuer) 118 119 @property 120 def notbefore(self) -> datetime.datetime: 121 try: 122 # type definitions haven't caught up with new API yet. 123 return self._cert.not_valid_before_utc # type: ignore 124 except AttributeError: # pragma: no cover 125 # cryptography < 42.0 126 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 127 128 @property 129 def notafter(self) -> datetime.datetime: 130 try: 131 return self._cert.not_valid_after_utc # type: ignore 132 except AttributeError: # pragma: no cover 133 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 134 135 def has_expired(self) -> bool: 136 if sys.version_info < (3, 11): # pragma: no cover 137 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 138 return datetime.datetime.now(datetime.UTC) > self.notafter 139 140 @property 141 def subject(self) -> list[tuple[str, str]]: 142 return _name_to_keyval(self._cert.subject) 143 144 @property 145 def serial(self) -> int: 146 return self._cert.serial_number 147 148 @property 149 def is_ca(self) -> bool: 150 constraints: x509.BasicConstraints 151 try: 152 constraints = self._cert.extensions.get_extension_for_class( 153 x509.BasicConstraints 154 ).value 155 return constraints.ca 156 except x509.ExtensionNotFound: 157 return False 158 159 @property 160 def keyinfo(self) -> tuple[str, int]: 161 public_key = self._cert.public_key() 162 if isinstance(public_key, rsa.RSAPublicKey): 163 return "RSA", public_key.key_size 164 if isinstance(public_key, dsa.DSAPublicKey): 165 return "DSA", public_key.key_size 166 if isinstance(public_key, ec.EllipticCurvePublicKey): 167 return f"EC ({public_key.curve.name})", public_key.key_size 168 return ( 169 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 170 getattr(public_key, "key_size", -1), 171 ) # pragma: no cover 172 173 @property 174 def cn(self) -> str | None: 175 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 176 if attrs: 177 return cast(str, attrs[0].value) 178 return None 179 180 @property 181 def organization(self) -> str | None: 182 attrs = self._cert.subject.get_attributes_for_oid( 183 x509.NameOID.ORGANIZATION_NAME 184 ) 185 if attrs: 186 return cast(str, attrs[0].value) 187 return None 188 189 @property 190 def altnames(self) -> x509.GeneralNames: 191 """ 192 Get all SubjectAlternativeName DNS altnames. 193 """ 194 try: 195 sans = self._cert.extensions.get_extension_for_class( 196 x509.SubjectAlternativeName 197 ).value 198 except x509.ExtensionNotFound: 199 return x509.GeneralNames([]) 200 else: 201 return x509.GeneralNames(sans) 202 203 @property 204 def crl_distribution_points(self) -> list[str]: 205 try: 206 ext = self._cert.extensions.get_extension_for_class( 207 x509.CRLDistributionPoints 208 ).value 209 except x509.ExtensionNotFound: 210 return [] 211 else: 212 return [ 213 dist_point.full_name[0].value 214 for dist_point in ext 215 if dist_point.full_name 216 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 217 ]
Representation of a (TLS) certificate.
def
public_key( self) -> Union[cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey]:
notbefore: datetime.datetime
119 @property 120 def notbefore(self) -> datetime.datetime: 121 try: 122 # type definitions haven't caught up with new API yet. 123 return self._cert.not_valid_before_utc # type: ignore 124 except AttributeError: # pragma: no cover 125 # cryptography < 42.0 126 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
keyinfo: tuple[str, int]
159 @property 160 def keyinfo(self) -> tuple[str, int]: 161 public_key = self._cert.public_key() 162 if isinstance(public_key, rsa.RSAPublicKey): 163 return "RSA", public_key.key_size 164 if isinstance(public_key, dsa.DSAPublicKey): 165 return "DSA", public_key.key_size 166 if isinstance(public_key, ec.EllipticCurvePublicKey): 167 return f"EC ({public_key.curve.name})", public_key.key_size 168 return ( 169 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 170 getattr(public_key, "key_size", -1), 171 ) # pragma: no cover
altnames: cryptography.x509.extensions.GeneralNames
189 @property 190 def altnames(self) -> x509.GeneralNames: 191 """ 192 Get all SubjectAlternativeName DNS altnames. 193 """ 194 try: 195 sans = self._cert.extensions.get_extension_for_class( 196 x509.SubjectAlternativeName 197 ).value 198 except x509.ExtensionNotFound: 199 return x509.GeneralNames([]) 200 else: 201 return x509.GeneralNames(sans)
Get all SubjectAlternativeName DNS altnames.
crl_distribution_points: list[str]
203 @property 204 def crl_distribution_points(self) -> list[str]: 205 try: 206 ext = self._cert.extensions.get_extension_for_class( 207 x509.CRLDistributionPoints 208 ).value 209 except x509.ExtensionNotFound: 210 return [] 211 else: 212 return [ 213 dist_point.full_name[0].value 214 for dist_point in ext 215 if dist_point.full_name 216 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 217 ]