mitmproxy.certs
1import contextlib 2import datetime 3import ipaddress 4import logging 5import os 6import sys 7import warnings 8from collections.abc import Iterable 9from dataclasses import dataclass 10from pathlib import Path 11from typing import cast 12from typing import NewType 13from typing import Optional 14from typing import Union 15 16import OpenSSL 17from cryptography import x509 18from cryptography.hazmat.primitives import hashes 19from cryptography.hazmat.primitives import serialization 20from cryptography.hazmat.primitives.asymmetric import dsa 21from cryptography.hazmat.primitives.asymmetric import ec 22from cryptography.hazmat.primitives.asymmetric import rsa 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes 24from cryptography.hazmat.primitives.serialization import pkcs12 25from cryptography.x509 import ExtendedKeyUsageOID 26from cryptography.x509 import NameOID 27 28from mitmproxy.coretypes import serializable 29 30if sys.version_info < (3, 13): # pragma: no cover 31 from typing_extensions import deprecated 32else: 33 from warnings import deprecated 34 35logger = logging.getLogger(__name__) 36 37# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815 38# Note that expiry will be offset by `CERT_VALIDITY_OFFSET`, i.e. the cert will be 39# backdated a bit to account for clients with incorrect clocks. 40CA_EXPIRY = datetime.timedelta(days=10 * 365) 41CERT_EXPIRY = datetime.timedelta(days=199) 42CRL_EXPIRY = datetime.timedelta(days=7) 43 44CERT_VALIDITY_OFFSET = datetime.timedelta(days=-2) 45 46# Generated with "openssl dhparam". It's too slow to generate this on startup. 47DEFAULT_DHPARAM = b""" 48-----BEGIN DH PARAMETERS----- 49MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3 50O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv 51j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ 52Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB 53chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC 54ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq 55o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX 56IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv 57A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8 586mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I 59rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI= 60-----END DH PARAMETERS----- 61""" 62 63 64class Cert(serializable.Serializable): 65 """Representation of a (TLS) certificate.""" 66 67 _cert: x509.Certificate 68 69 def __init__(self, cert: x509.Certificate): 70 assert isinstance(cert, x509.Certificate) 71 self._cert = cert 72 73 def __eq__(self, other): 74 return self.fingerprint() == other.fingerprint() 75 76 def __repr__(self): 77 altnames = [str(x.value) for x in self.altnames] 78 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 79 80 def __hash__(self): 81 return self._cert.__hash__() 82 83 @classmethod 84 def from_state(cls, state): 85 return cls.from_pem(state) 86 87 def get_state(self): 88 return self.to_pem() 89 90 def set_state(self, state): 91 self._cert = x509.load_pem_x509_certificate(state) 92 93 @classmethod 94 def from_pem(cls, data: bytes) -> "Cert": 95 cert = x509.load_pem_x509_certificate(data) # type: ignore 96 return cls(cert) 97 98 def to_pem(self) -> bytes: 99 return self._cert.public_bytes(serialization.Encoding.PEM) 100 101 @classmethod 102 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 103 return Cert(x509.to_cryptography()) 104 105 @deprecated("Use `to_cryptography` instead.") 106 def to_pyopenssl(self) -> OpenSSL.crypto.X509: # pragma: no cover 107 return OpenSSL.crypto.X509.from_cryptography(self._cert) 108 109 def to_cryptography(self) -> x509.Certificate: 110 return self._cert 111 112 def public_key(self) -> CertificatePublicKeyTypes: 113 return self._cert.public_key() 114 115 def fingerprint(self) -> bytes: 116 return self._cert.fingerprint(hashes.SHA256()) 117 118 @property 119 def issuer(self) -> list[tuple[str, str]]: 120 return _name_to_keyval(self._cert.issuer) 121 122 @property 123 def notbefore(self) -> datetime.datetime: 124 try: 125 # type definitions haven't caught up with new API yet. 126 return self._cert.not_valid_before_utc # type: ignore 127 except AttributeError: # pragma: no cover 128 # cryptography < 42.0 129 return self._cert.not_valid_before.replace(tzinfo=datetime.UTC) 130 131 @property 132 def notafter(self) -> datetime.datetime: 133 try: 134 return self._cert.not_valid_after_utc # type: ignore 135 except AttributeError: # pragma: no cover 136 return self._cert.not_valid_after.replace(tzinfo=datetime.UTC) 137 138 def has_expired(self) -> bool: 139 if sys.version_info < (3, 11): # pragma: no cover 140 return datetime.datetime.now(datetime.UTC) > self.notafter 141 return datetime.datetime.now(datetime.UTC) > self.notafter 142 143 @property 144 def subject(self) -> list[tuple[str, str]]: 145 return _name_to_keyval(self._cert.subject) 146 147 @property 148 def serial(self) -> int: 149 return self._cert.serial_number 150 151 @property 152 def is_ca(self) -> bool: 153 constraints: x509.BasicConstraints 154 try: 155 constraints = self._cert.extensions.get_extension_for_class( 156 x509.BasicConstraints 157 ).value 158 return constraints.ca 159 except x509.ExtensionNotFound: 160 return False 161 162 @property 163 def keyinfo(self) -> tuple[str, int]: 164 public_key = self._cert.public_key() 165 if isinstance(public_key, rsa.RSAPublicKey): 166 return "RSA", public_key.key_size 167 if isinstance(public_key, dsa.DSAPublicKey): 168 return "DSA", public_key.key_size 169 if isinstance(public_key, ec.EllipticCurvePublicKey): 170 return f"EC ({public_key.curve.name})", public_key.key_size 171 return ( 172 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 173 getattr(public_key, "key_size", -1), 174 ) # pragma: no cover 175 176 @property 177 def cn(self) -> str | None: 178 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 179 if attrs: 180 return cast(str, attrs[0].value) 181 return None 182 183 @property 184 def organization(self) -> str | None: 185 attrs = self._cert.subject.get_attributes_for_oid( 186 x509.NameOID.ORGANIZATION_NAME 187 ) 188 if attrs: 189 return cast(str, attrs[0].value) 190 return None 191 192 @property 193 def altnames(self) -> x509.GeneralNames: 194 """ 195 Get all SubjectAlternativeName DNS altnames. 196 """ 197 try: 198 sans = self._cert.extensions.get_extension_for_class( 199 x509.SubjectAlternativeName 200 ).value 201 except x509.ExtensionNotFound: 202 return x509.GeneralNames([]) 203 else: 204 return x509.GeneralNames(sans) 205 206 @property 207 def crl_distribution_points(self) -> list[str]: 208 try: 209 ext = self._cert.extensions.get_extension_for_class( 210 x509.CRLDistributionPoints 211 ).value 212 except x509.ExtensionNotFound: 213 return [] 214 else: 215 return [ 216 dist_point.full_name[0].value 217 for dist_point in ext 218 if dist_point.full_name 219 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 220 ] 221 222 223def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: 224 parts = [] 225 for attr in name: 226 k = attr.rfc4514_string().partition("=")[0] 227 v = cast(str, attr.value) 228 parts.append((k, v)) 229 return parts 230 231 232def create_ca( 233 organization: str, 234 cn: str, 235 key_size: int, 236) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]: 237 now = datetime.datetime.now() 238 239 private_key = rsa.generate_private_key( 240 public_exponent=65537, 241 key_size=key_size, 242 ) # type: ignore 243 name = x509.Name( 244 [ 245 x509.NameAttribute(NameOID.COMMON_NAME, cn), 246 x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), 247 ] 248 ) 249 builder = x509.CertificateBuilder() 250 builder = builder.serial_number(x509.random_serial_number()) 251 builder = builder.subject_name(name) 252 builder = builder.not_valid_before(now + CERT_VALIDITY_OFFSET) 253 builder = builder.not_valid_after(now + CERT_VALIDITY_OFFSET + CA_EXPIRY) 254 builder = builder.issuer_name(name) 255 builder = builder.public_key(private_key.public_key()) 256 builder = builder.add_extension( 257 x509.BasicConstraints(ca=True, path_length=None), critical=True 258 ) 259 builder = builder.add_extension( 260 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 261 ) 262 builder = builder.add_extension( 263 x509.KeyUsage( 264 digital_signature=False, 265 content_commitment=False, 266 key_encipherment=False, 267 data_encipherment=False, 268 key_agreement=False, 269 key_cert_sign=True, 270 crl_sign=True, 271 encipher_only=False, 272 decipher_only=False, 273 ), 274 critical=True, 275 ) 276 builder = builder.add_extension( 277 x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), 278 critical=False, 279 ) 280 cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore 281 return private_key, cert 282 283 284def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames: 285 """ 286 SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames. 287 This function converts the old format to the new one. 288 """ 289 if isinstance(sans, x509.GeneralNames): 290 return sans 291 elif ( 292 isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str) 293 ): # pragma: no cover 294 warnings.warn( 295 "Passing SANs as a list of strings is deprecated.", 296 DeprecationWarning, 297 stacklevel=2, 298 ) 299 300 ss: list[x509.GeneralName] = [] 301 for x in cast(list[str], sans): 302 try: 303 ip = ipaddress.ip_address(x) 304 except ValueError: 305 x = x.encode("idna").decode() 306 ss.append(x509.DNSName(x)) 307 else: 308 ss.append(x509.IPAddress(ip)) 309 return x509.GeneralNames(ss) 310 else: 311 return x509.GeneralNames(cast(Iterable[x509.GeneralName], sans)) 312 313 314def dummy_cert( 315 privkey: rsa.RSAPrivateKey, 316 cacert: x509.Certificate, 317 commonname: str | None, 318 sans: Iterable[x509.GeneralName], 319 organization: str | None = None, 320 crl_url: str | None = None, 321) -> Cert: 322 """ 323 Generates a dummy certificate. 324 325 privkey: CA private key 326 cacert: CA certificate 327 commonname: Common name for the generated certificate. 328 sans: A list of Subject Alternate Names. 329 organization: Organization name for the generated certificate. 330 crl_url: URL of CRL distribution point 331 332 Returns cert if operation succeeded, None if not. 333 """ 334 builder = x509.CertificateBuilder() 335 builder = builder.issuer_name(cacert.subject) 336 builder = builder.add_extension( 337 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 338 ) 339 builder = builder.public_key(cacert.public_key()) 340 341 now = datetime.datetime.now() 342 builder = builder.not_valid_before(now + CERT_VALIDITY_OFFSET) 343 builder = builder.not_valid_after(now + CERT_VALIDITY_OFFSET + CERT_EXPIRY) 344 345 subject = [] 346 is_valid_commonname = commonname is not None and len(commonname) < 64 347 if is_valid_commonname: 348 assert commonname is not None 349 subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname)) 350 if organization is not None: 351 assert organization is not None 352 subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) 353 builder = builder.subject_name(x509.Name(subject)) 354 builder = builder.serial_number(x509.random_serial_number()) 355 356 # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. 357 builder = builder.add_extension( 358 x509.SubjectAlternativeName(_fix_legacy_sans(sans)), 359 critical=not is_valid_commonname, 360 ) 361 362 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1 363 # Per RFC 5280 §4.2.1.2, the AKI's keyIdentifier in a child certificate 364 # MUST be byte-equal to the issuer's stored SubjectKeyIdentifier. If we 365 # recompute it from the public key with `from_issuer_public_key()`, we 366 # always derive a SHA-1 digest, which mismatches whenever the issuer's 367 # SKI was generated by a different method (RFC 7093 truncated 368 # SHA-256/384/512, hardware-rooted CAs, or any custom value). This 369 # mismatch is rejected by strict TLS chain builders (`X509_V_FLAG_X509_STRICT`, 370 # Python `ssl`, Go `crypto/x509`) with "authority and subject key 371 # identifier mismatch". Most notably, cert-manager >=1.18 / Go >=1.25 372 # default to truncated SHA-256 SKIs for FIPS 140-3 compliance. 373 try: 374 issuer_ski = cacert.extensions.get_extension_for_class( 375 x509.SubjectKeyIdentifier 376 ).value 377 aki = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(issuer_ski) 378 except x509.ExtensionNotFound: 379 # Fall back when the issuer cert has no SKI extension at all (legacy 380 # CAs predating RFC 5280 conformance). 381 aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()) # type: ignore 382 builder = builder.add_extension(aki, critical=False) 383 # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways, 384 # see https://github.com/mitmproxy/mitmproxy/issues/6494. 385 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states 386 # that SKI is optional for the leaf cert, so we skip that. 387 388 if crl_url: 389 builder = builder.add_extension( 390 x509.CRLDistributionPoints( 391 [ 392 x509.DistributionPoint( 393 [x509.UniformResourceIdentifier(crl_url)], 394 relative_name=None, 395 crl_issuer=None, 396 reasons=None, 397 ) 398 ] 399 ), 400 critical=False, 401 ) 402 403 cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) # type: ignore 404 return Cert(cert) 405 406 407def dummy_crl( 408 privkey: rsa.RSAPrivateKey, 409 cacert: x509.Certificate, 410) -> bytes: 411 """ 412 Generates an empty CRL signed with the CA key 413 414 privkey: CA private key 415 cacert: CA certificate 416 417 Returns a CRL DER encoded 418 """ 419 builder = x509.CertificateRevocationListBuilder() 420 builder = builder.issuer_name(cacert.issuer) 421 422 now = datetime.datetime.now() 423 builder = builder.last_update(now + CERT_VALIDITY_OFFSET) 424 builder = builder.next_update(now + CERT_VALIDITY_OFFSET + CRL_EXPIRY) 425 426 builder = builder.add_extension(x509.CRLNumber(1000), False) # meaningless number 427 crl = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) 428 return crl.public_bytes(serialization.Encoding.DER) 429 430 431@dataclass(frozen=True) 432class CertStoreEntry: 433 cert: Cert 434 privatekey: rsa.RSAPrivateKey 435 chain_file: Path | None 436 chain_certs: list[Cert] 437 438 439TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) 440TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans) 441TCertId = Union[TCustomCertId, TGeneratedCertId] 442 443DHParams = NewType("DHParams", bytes) 444 445 446class CertStore: 447 """ 448 Implements an in-memory certificate store. 449 """ 450 451 STORE_CAP = 100 452 default_privatekey: rsa.RSAPrivateKey 453 default_ca: Cert 454 default_chain_file: Path | None 455 default_chain_certs: list[Cert] 456 dhparams: DHParams 457 certs: dict[TCertId, CertStoreEntry] 458 expire_queue: list[CertStoreEntry] 459 460 def __init__( 461 self, 462 default_privatekey: rsa.RSAPrivateKey, 463 default_ca: Cert, 464 default_chain_file: Path | None, 465 default_crl: bytes, 466 dhparams: DHParams, 467 ): 468 self.default_privatekey = default_privatekey 469 self.default_ca = default_ca 470 self.default_chain_file = default_chain_file 471 self.default_crl = default_crl 472 self.default_chain_certs = ( 473 [ 474 Cert(c) 475 for c in x509.load_pem_x509_certificates( 476 self.default_chain_file.read_bytes() 477 ) 478 ] 479 if self.default_chain_file 480 else [default_ca] 481 ) 482 self.dhparams = dhparams 483 self.certs = {} 484 self.expire_queue = [] 485 486 def expire(self, entry: CertStoreEntry) -> None: 487 self.expire_queue.append(entry) 488 if len(self.expire_queue) > self.STORE_CAP: 489 d = self.expire_queue.pop(0) 490 self.certs = {k: v for k, v in self.certs.items() if v != d} 491 492 @staticmethod 493 def load_dhparam(path: Path) -> DHParams: 494 # mitmproxy<=0.10 doesn't generate a dhparam file. 495 # Create it now if necessary. 496 if not path.exists(): 497 path.write_bytes(DEFAULT_DHPARAM) 498 499 # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's 500 # expected format. 501 bio = OpenSSL.SSL._lib.BIO_new_file( # type: ignore 502 str(path).encode(sys.getfilesystemencoding()), b"r" 503 ) 504 if bio != OpenSSL.SSL._ffi.NULL: # type: ignore 505 bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore 506 dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore 507 bio, 508 OpenSSL.SSL._ffi.NULL, # type: ignore 509 OpenSSL.SSL._ffi.NULL, # type: ignore 510 OpenSSL.SSL._ffi.NULL, # type: ignore 511 ) 512 dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free) # type: ignore 513 return dh 514 raise RuntimeError("Error loading DH Params.") # pragma: no cover 515 516 @classmethod 517 def from_store( 518 cls, 519 path: Path | str, 520 basename: str, 521 key_size: int, 522 passphrase: bytes | None = None, 523 ) -> "CertStore": 524 path = Path(path) 525 ca_file = path / f"{basename}-ca.pem" 526 dhparam_file = path / f"{basename}-dhparam.pem" 527 if not ca_file.exists(): 528 cls.create_store(path, basename, key_size) 529 return cls.from_files(ca_file, dhparam_file, passphrase) 530 531 @classmethod 532 def from_files( 533 cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None 534 ) -> "CertStore": 535 raw = ca_file.read_bytes() 536 key = load_pem_private_key(raw, passphrase) 537 dh = cls.load_dhparam(dhparam_file) 538 certs = x509.load_pem_x509_certificates(raw) 539 ca = Cert(certs[0]) 540 crl = dummy_crl(key, ca._cert) 541 if len(certs) > 1: 542 chain_file: Path | None = ca_file 543 else: 544 chain_file = None 545 return cls(key, ca, chain_file, crl, dh) 546 547 @staticmethod 548 @contextlib.contextmanager 549 def umask_secret(): 550 """ 551 Context to temporarily set umask to its original value bitor 0o77. 552 Useful when writing private keys to disk so that only the owner 553 will be able to read them. 554 """ 555 original_umask = os.umask(0) 556 os.umask(original_umask | 0o77) 557 try: 558 yield 559 finally: 560 os.umask(original_umask) 561 562 @staticmethod 563 def create_store( 564 path: Path, basename: str, key_size: int, organization=None, cn=None 565 ) -> None: 566 path.mkdir(parents=True, exist_ok=True) 567 568 organization = organization or basename 569 cn = cn or basename 570 571 key: rsa.RSAPrivateKeyWithSerialization 572 ca: x509.Certificate 573 key, ca = create_ca(organization=organization, cn=cn, key_size=key_size) 574 575 # Dump the CA plus private key. 576 with CertStore.umask_secret(): 577 # PEM format 578 (path / f"{basename}-ca.pem").write_bytes( 579 key.private_bytes( 580 encoding=serialization.Encoding.PEM, 581 format=serialization.PrivateFormat.TraditionalOpenSSL, 582 encryption_algorithm=serialization.NoEncryption(), 583 ) 584 + ca.public_bytes(serialization.Encoding.PEM) 585 ) 586 587 # PKCS12 format for Windows devices 588 (path / f"{basename}-ca.p12").write_bytes( 589 pkcs12.serialize_key_and_certificates( # type: ignore 590 name=basename.encode(), 591 key=key, 592 cert=ca, 593 cas=None, 594 encryption_algorithm=serialization.NoEncryption(), 595 ) 596 ) 597 598 # Dump the certificate in PEM format 599 pem_cert = ca.public_bytes(serialization.Encoding.PEM) 600 (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert) 601 # Create a .cer file with the same contents for Android 602 (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert) 603 604 # Dump the certificate in PKCS12 format for Windows devices 605 (path / f"{basename}-ca-cert.p12").write_bytes( 606 pkcs12.serialize_key_and_certificates( 607 name=basename.encode(), 608 key=None, # type: ignore 609 cert=ca, 610 cas=None, 611 encryption_algorithm=serialization.NoEncryption(), 612 ) 613 ) 614 615 (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM) 616 617 def add_cert_file( 618 self, spec: str, path: Path, passphrase: bytes | None = None 619 ) -> None: 620 raw = path.read_bytes() 621 cert = Cert.from_pem(raw) 622 try: 623 private_key = load_pem_private_key(raw, password=passphrase) 624 except ValueError as e: 625 private_key = self.default_privatekey 626 if cert.public_key() != private_key.public_key(): 627 raise ValueError( 628 f'Unable to find private key in "{path.absolute()}": {e}' 629 ) from e 630 else: 631 if cert.public_key() != private_key.public_key(): 632 raise ValueError( 633 f'Private and public keys in "{path.absolute()}" do not match:\n' 634 f"{cert.public_key()=}\n" 635 f"{private_key.public_key()=}" 636 ) 637 638 try: 639 chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)] 640 except ValueError as e: 641 logger.warning(f"Failed to read certificate chain: {e}") 642 chain = [cert] 643 644 if cert.is_ca: 645 logger.warning( 646 f'"{path.absolute()}" is a certificate authority and not a leaf certificate. ' 647 f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/." 648 ) 649 650 self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec) 651 652 def add_cert(self, entry: CertStoreEntry, *names: str) -> None: 653 """ 654 Adds a cert to the certstore. We register the CN in the cert plus 655 any SANs, and also the list of names provided as an argument. 656 """ 657 if entry.cert.cn: 658 self.certs[entry.cert.cn] = entry 659 for i in entry.cert.altnames: 660 self.certs[str(i.value)] = entry 661 for i in names: 662 self.certs[i] = entry 663 664 @staticmethod 665 def asterisk_forms(dn: str | x509.GeneralName) -> list[str]: 666 """ 667 Return all asterisk forms for a domain. For example, for www.example.com this will return 668 [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. 669 """ 670 if isinstance(dn, str): 671 parts = dn.split(".") 672 ret = [dn] 673 for i in range(1, len(parts)): 674 ret.append("*." + ".".join(parts[i:])) 675 return ret 676 elif isinstance(dn, x509.DNSName): 677 return CertStore.asterisk_forms(dn.value) 678 else: 679 return [str(dn.value)] 680 681 def get_cert( 682 self, 683 commonname: str | None, 684 sans: Iterable[x509.GeneralName], 685 organization: str | None = None, 686 crl_url: str | None = None, 687 ) -> CertStoreEntry: 688 """ 689 commonname: Common name for the generated certificate. Must be a 690 valid, plain-ASCII, IDNA-encoded domain name. 691 692 sans: A list of Subject Alternate Names. 693 694 organization: Organization name for the generated certificate. 695 696 crl_url: URL of CRL distribution point 697 """ 698 sans = _fix_legacy_sans(sans) 699 700 potential_keys: list[TCertId] = [] 701 if commonname: 702 potential_keys.extend(self.asterisk_forms(commonname)) 703 for s in sans: 704 potential_keys.extend(self.asterisk_forms(s)) 705 potential_keys.append("*") 706 potential_keys.append((commonname, sans)) 707 708 name = next(filter(lambda key: key in self.certs, potential_keys), None) 709 if name: 710 entry = self.certs[name] 711 else: 712 entry = CertStoreEntry( 713 cert=dummy_cert( 714 self.default_privatekey, 715 self.default_ca._cert, 716 commonname, 717 sans, 718 organization, 719 crl_url, 720 ), 721 privatekey=self.default_privatekey, 722 chain_file=self.default_chain_file, 723 chain_certs=self.default_chain_certs, 724 ) 725 self.certs[(commonname, sans)] = entry 726 self.expire(entry) 727 728 return entry 729 730 731def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey: 732 """ 733 like cryptography's load_pem_private_key, but silently falls back to not using a password 734 if the private key is unencrypted. 735 """ 736 try: 737 return serialization.load_pem_private_key(data, password) # type: ignore 738 except TypeError: 739 if password is not None: 740 return load_pem_private_key(data, None) 741 raise
class
Cert(mitmproxy.coretypes.serializable.Serializable):
65class Cert(serializable.Serializable): 66 """Representation of a (TLS) certificate.""" 67 68 _cert: x509.Certificate 69 70 def __init__(self, cert: x509.Certificate): 71 assert isinstance(cert, x509.Certificate) 72 self._cert = cert 73 74 def __eq__(self, other): 75 return self.fingerprint() == other.fingerprint() 76 77 def __repr__(self): 78 altnames = [str(x.value) for x in self.altnames] 79 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 80 81 def __hash__(self): 82 return self._cert.__hash__() 83 84 @classmethod 85 def from_state(cls, state): 86 return cls.from_pem(state) 87 88 def get_state(self): 89 return self.to_pem() 90 91 def set_state(self, state): 92 self._cert = x509.load_pem_x509_certificate(state) 93 94 @classmethod 95 def from_pem(cls, data: bytes) -> "Cert": 96 cert = x509.load_pem_x509_certificate(data) # type: ignore 97 return cls(cert) 98 99 def to_pem(self) -> bytes: 100 return self._cert.public_bytes(serialization.Encoding.PEM) 101 102 @classmethod 103 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 104 return Cert(x509.to_cryptography()) 105 106 @deprecated("Use `to_cryptography` instead.") 107 def to_pyopenssl(self) -> OpenSSL.crypto.X509: # pragma: no cover 108 return OpenSSL.crypto.X509.from_cryptography(self._cert) 109 110 def to_cryptography(self) -> x509.Certificate: 111 return self._cert 112 113 def public_key(self) -> CertificatePublicKeyTypes: 114 return self._cert.public_key() 115 116 def fingerprint(self) -> bytes: 117 return self._cert.fingerprint(hashes.SHA256()) 118 119 @property 120 def issuer(self) -> list[tuple[str, str]]: 121 return _name_to_keyval(self._cert.issuer) 122 123 @property 124 def notbefore(self) -> datetime.datetime: 125 try: 126 # type definitions haven't caught up with new API yet. 127 return self._cert.not_valid_before_utc # type: ignore 128 except AttributeError: # pragma: no cover 129 # cryptography < 42.0 130 return self._cert.not_valid_before.replace(tzinfo=datetime.UTC) 131 132 @property 133 def notafter(self) -> datetime.datetime: 134 try: 135 return self._cert.not_valid_after_utc # type: ignore 136 except AttributeError: # pragma: no cover 137 return self._cert.not_valid_after.replace(tzinfo=datetime.UTC) 138 139 def has_expired(self) -> bool: 140 if sys.version_info < (3, 11): # pragma: no cover 141 return datetime.datetime.now(datetime.UTC) > self.notafter 142 return datetime.datetime.now(datetime.UTC) > self.notafter 143 144 @property 145 def subject(self) -> list[tuple[str, str]]: 146 return _name_to_keyval(self._cert.subject) 147 148 @property 149 def serial(self) -> int: 150 return self._cert.serial_number 151 152 @property 153 def is_ca(self) -> bool: 154 constraints: x509.BasicConstraints 155 try: 156 constraints = self._cert.extensions.get_extension_for_class( 157 x509.BasicConstraints 158 ).value 159 return constraints.ca 160 except x509.ExtensionNotFound: 161 return False 162 163 @property 164 def keyinfo(self) -> tuple[str, int]: 165 public_key = self._cert.public_key() 166 if isinstance(public_key, rsa.RSAPublicKey): 167 return "RSA", public_key.key_size 168 if isinstance(public_key, dsa.DSAPublicKey): 169 return "DSA", public_key.key_size 170 if isinstance(public_key, ec.EllipticCurvePublicKey): 171 return f"EC ({public_key.curve.name})", public_key.key_size 172 return ( 173 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 174 getattr(public_key, "key_size", -1), 175 ) # pragma: no cover 176 177 @property 178 def cn(self) -> str | None: 179 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 180 if attrs: 181 return cast(str, attrs[0].value) 182 return None 183 184 @property 185 def organization(self) -> str | None: 186 attrs = self._cert.subject.get_attributes_for_oid( 187 x509.NameOID.ORGANIZATION_NAME 188 ) 189 if attrs: 190 return cast(str, attrs[0].value) 191 return None 192 193 @property 194 def altnames(self) -> x509.GeneralNames: 195 """ 196 Get all SubjectAlternativeName DNS altnames. 197 """ 198 try: 199 sans = self._cert.extensions.get_extension_for_class( 200 x509.SubjectAlternativeName 201 ).value 202 except x509.ExtensionNotFound: 203 return x509.GeneralNames([]) 204 else: 205 return x509.GeneralNames(sans) 206 207 @property 208 def crl_distribution_points(self) -> list[str]: 209 try: 210 ext = self._cert.extensions.get_extension_for_class( 211 x509.CRLDistributionPoints 212 ).value 213 except x509.ExtensionNotFound: 214 return [] 215 else: 216 return [ 217 dist_point.full_name[0].value 218 for dist_point in ext 219 if dist_point.full_name 220 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 221 ]
Representation of a (TLS) certificate.
def
public_key( self) -> cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey | cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey:
notbefore: datetime.datetime
123 @property 124 def notbefore(self) -> datetime.datetime: 125 try: 126 # type definitions haven't caught up with new API yet. 127 return self._cert.not_valid_before_utc # type: ignore 128 except AttributeError: # pragma: no cover 129 # cryptography < 42.0 130 return self._cert.not_valid_before.replace(tzinfo=datetime.UTC)
keyinfo: tuple[str, int]
163 @property 164 def keyinfo(self) -> tuple[str, int]: 165 public_key = self._cert.public_key() 166 if isinstance(public_key, rsa.RSAPublicKey): 167 return "RSA", public_key.key_size 168 if isinstance(public_key, dsa.DSAPublicKey): 169 return "DSA", public_key.key_size 170 if isinstance(public_key, ec.EllipticCurvePublicKey): 171 return f"EC ({public_key.curve.name})", public_key.key_size 172 return ( 173 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 174 getattr(public_key, "key_size", -1), 175 ) # pragma: no cover
altnames: cryptography.x509.extensions.GeneralNames
193 @property 194 def altnames(self) -> x509.GeneralNames: 195 """ 196 Get all SubjectAlternativeName DNS altnames. 197 """ 198 try: 199 sans = self._cert.extensions.get_extension_for_class( 200 x509.SubjectAlternativeName 201 ).value 202 except x509.ExtensionNotFound: 203 return x509.GeneralNames([]) 204 else: 205 return x509.GeneralNames(sans)
Get all SubjectAlternativeName DNS altnames.
crl_distribution_points: list[str]
207 @property 208 def crl_distribution_points(self) -> list[str]: 209 try: 210 ext = self._cert.extensions.get_extension_for_class( 211 x509.CRLDistributionPoints 212 ).value 213 except x509.ExtensionNotFound: 214 return [] 215 else: 216 return [ 217 dist_point.full_name[0].value 218 for dist_point in ext 219 if dist_point.full_name 220 and isinstance(dist_point.full_name[0], x509.UniformResourceIdentifier) 221 ]