mitmproxy.certs
1import contextlib 2import datetime 3import ipaddress 4import logging 5import os 6import sys 7import warnings 8from collections.abc import Iterable 9from dataclasses import dataclass 10from pathlib import Path 11from typing import cast 12from typing import NewType 13from typing import Optional 14from typing import Union 15 16import OpenSSL 17from cryptography import x509 18from cryptography.hazmat.primitives import hashes 19from cryptography.hazmat.primitives import serialization 20from cryptography.hazmat.primitives.asymmetric import dsa 21from cryptography.hazmat.primitives.asymmetric import ec 22from cryptography.hazmat.primitives.asymmetric import rsa 23from cryptography.hazmat.primitives.asymmetric.types import CertificatePublicKeyTypes 24from cryptography.hazmat.primitives.serialization import pkcs12 25from cryptography.x509 import ExtendedKeyUsageOID 26from cryptography.x509 import NameOID 27 28from mitmproxy.coretypes import serializable 29 30logger = logging.getLogger(__name__) 31 32# Default expiry must not be too long: https://github.com/mitmproxy/mitmproxy/issues/815 33CA_EXPIRY = datetime.timedelta(days=10 * 365) 34CERT_EXPIRY = datetime.timedelta(days=365) 35 36# Generated with "openssl dhparam". It's too slow to generate this on startup. 37DEFAULT_DHPARAM = b""" 38-----BEGIN DH PARAMETERS----- 39MIICCAKCAgEAyT6LzpwVFS3gryIo29J5icvgxCnCebcdSe/NHMkD8dKJf8suFCg3 40O2+dguLakSVif/t6dhImxInJk230HmfC8q93hdcg/j8rLGJYDKu3ik6H//BAHKIv 41j5O9yjU3rXCfmVJQic2Nne39sg3CreAepEts2TvYHhVv3TEAzEqCtOuTjgDv0ntJ 42Gwpj+BJBRQGG9NvprX1YGJ7WOFBP/hWU7d6tgvE6Xa7T/u9QIKpYHMIkcN/l3ZFB 43chZEqVlyrcngtSXCROTPcDOQ6Q8QzhaBJS+Z6rcsd7X+haiQqvoFcmaJ08Ks6LQC 44ZIL2EtYJw8V8z7C0igVEBIADZBI6OTbuuhDwRw//zU1uq52Oc48CIZlGxTYG/Evq 45o9EWAXUYVzWkDSTeBH1r4z/qLPE2cnhtMxbFxuvK53jGB0emy2y1Ei6IhKshJ5qX 46IB/aE7SSHyQ3MDHHkCmQJCsOd4Mo26YX61NZ+n501XjqpCBQ2+DfZCBh8Va2wDyv 47A2Ryg9SUz8j0AXViRNMJgJrr446yro/FuJZwnQcO3WQnXeqSBnURqKjmqkeFP+d8 486mk2tqJaY507lRNqtGlLnj7f5RNoBFJDCLBNurVgfvq9TCVWKDIFD4vZRjCrnl6I 49rD693XKIHUCWOjMh1if6omGXKHH40QuME2gNa50+YPn1iYDl88uDbbMCAQI= 50-----END DH PARAMETERS----- 51""" 52 53 54class Cert(serializable.Serializable): 55 """Representation of a (TLS) certificate.""" 56 57 _cert: x509.Certificate 58 59 def __init__(self, cert: x509.Certificate): 60 assert isinstance(cert, x509.Certificate) 61 self._cert = cert 62 63 def __eq__(self, other): 64 return self.fingerprint() == other.fingerprint() 65 66 def __repr__(self): 67 altnames = [str(x.value) for x in self.altnames] 68 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 69 70 def __hash__(self): 71 return self._cert.__hash__() 72 73 @classmethod 74 def from_state(cls, state): 75 return cls.from_pem(state) 76 77 def get_state(self): 78 return self.to_pem() 79 80 def set_state(self, state): 81 self._cert = x509.load_pem_x509_certificate(state) 82 83 @classmethod 84 def from_pem(cls, data: bytes) -> "Cert": 85 cert = x509.load_pem_x509_certificate(data) # type: ignore 86 return cls(cert) 87 88 def to_pem(self) -> bytes: 89 return self._cert.public_bytes(serialization.Encoding.PEM) 90 91 @classmethod 92 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 93 return Cert(x509.to_cryptography()) 94 95 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 96 return OpenSSL.crypto.X509.from_cryptography(self._cert) 97 98 def public_key(self) -> CertificatePublicKeyTypes: 99 return self._cert.public_key() 100 101 def fingerprint(self) -> bytes: 102 return self._cert.fingerprint(hashes.SHA256()) 103 104 @property 105 def issuer(self) -> list[tuple[str, str]]: 106 return _name_to_keyval(self._cert.issuer) 107 108 @property 109 def notbefore(self) -> datetime.datetime: 110 try: 111 # type definitions haven't caught up with new API yet. 112 return self._cert.not_valid_before_utc # type: ignore 113 except AttributeError: # pragma: no cover 114 # cryptography < 42.0 115 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 116 117 @property 118 def notafter(self) -> datetime.datetime: 119 try: 120 return self._cert.not_valid_after_utc # type: ignore 121 except AttributeError: # pragma: no cover 122 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 123 124 def has_expired(self) -> bool: 125 if sys.version_info < (3, 11): # pragma: no cover 126 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 127 return datetime.datetime.now(datetime.UTC) > self.notafter 128 129 @property 130 def subject(self) -> list[tuple[str, str]]: 131 return _name_to_keyval(self._cert.subject) 132 133 @property 134 def serial(self) -> int: 135 return self._cert.serial_number 136 137 @property 138 def is_ca(self) -> bool: 139 constraints: x509.BasicConstraints 140 try: 141 constraints = self._cert.extensions.get_extension_for_class( 142 x509.BasicConstraints 143 ).value 144 return constraints.ca 145 except x509.ExtensionNotFound: 146 return False 147 148 @property 149 def keyinfo(self) -> tuple[str, int]: 150 public_key = self._cert.public_key() 151 if isinstance(public_key, rsa.RSAPublicKey): 152 return "RSA", public_key.key_size 153 if isinstance(public_key, dsa.DSAPublicKey): 154 return "DSA", public_key.key_size 155 if isinstance(public_key, ec.EllipticCurvePublicKey): 156 return f"EC ({public_key.curve.name})", public_key.key_size 157 return ( 158 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 159 getattr(public_key, "key_size", -1), 160 ) # pragma: no cover 161 162 @property 163 def cn(self) -> str | None: 164 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 165 if attrs: 166 return cast(str, attrs[0].value) 167 return None 168 169 @property 170 def organization(self) -> str | None: 171 attrs = self._cert.subject.get_attributes_for_oid( 172 x509.NameOID.ORGANIZATION_NAME 173 ) 174 if attrs: 175 return cast(str, attrs[0].value) 176 return None 177 178 @property 179 def altnames(self) -> x509.GeneralNames: 180 """ 181 Get all SubjectAlternativeName DNS altnames. 182 """ 183 try: 184 sans = self._cert.extensions.get_extension_for_class( 185 x509.SubjectAlternativeName 186 ).value 187 except x509.ExtensionNotFound: 188 return x509.GeneralNames([]) 189 else: 190 return x509.GeneralNames(sans) 191 192 193def _name_to_keyval(name: x509.Name) -> list[tuple[str, str]]: 194 parts = [] 195 for attr in name: 196 k = attr.rfc4514_string().partition("=")[0] 197 v = cast(str, attr.value) 198 parts.append((k, v)) 199 return parts 200 201 202def create_ca( 203 organization: str, 204 cn: str, 205 key_size: int, 206) -> tuple[rsa.RSAPrivateKeyWithSerialization, x509.Certificate]: 207 now = datetime.datetime.now() 208 209 private_key = rsa.generate_private_key( 210 public_exponent=65537, 211 key_size=key_size, 212 ) # type: ignore 213 name = x509.Name( 214 [ 215 x509.NameAttribute(NameOID.COMMON_NAME, cn), 216 x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), 217 ] 218 ) 219 builder = x509.CertificateBuilder() 220 builder = builder.serial_number(x509.random_serial_number()) 221 builder = builder.subject_name(name) 222 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 223 builder = builder.not_valid_after(now + CA_EXPIRY) 224 builder = builder.issuer_name(name) 225 builder = builder.public_key(private_key.public_key()) 226 builder = builder.add_extension( 227 x509.BasicConstraints(ca=True, path_length=None), critical=True 228 ) 229 builder = builder.add_extension( 230 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 231 ) 232 builder = builder.add_extension( 233 x509.KeyUsage( 234 digital_signature=False, 235 content_commitment=False, 236 key_encipherment=False, 237 data_encipherment=False, 238 key_agreement=False, 239 key_cert_sign=True, 240 crl_sign=True, 241 encipher_only=False, 242 decipher_only=False, 243 ), 244 critical=True, 245 ) 246 builder = builder.add_extension( 247 x509.SubjectKeyIdentifier.from_public_key(private_key.public_key()), 248 critical=False, 249 ) 250 cert = builder.sign(private_key=private_key, algorithm=hashes.SHA256()) # type: ignore 251 return private_key, cert 252 253 254def _fix_legacy_sans(sans: Iterable[x509.GeneralName] | list[str]) -> x509.GeneralNames: 255 """ 256 SANs used to be a list of strings in mitmproxy 10.1 and below, but now they're a list of GeneralNames. 257 This function converts the old format to the new one. 258 """ 259 if isinstance(sans, x509.GeneralNames): 260 return sans 261 elif ( 262 isinstance(sans, list) and len(sans) > 0 and isinstance(sans[0], str) 263 ): # pragma: no cover 264 warnings.warn( 265 "Passing SANs as a list of strings is deprecated.", 266 DeprecationWarning, 267 stacklevel=2, 268 ) 269 270 ss: list[x509.GeneralName] = [] 271 for x in cast(list[str], sans): 272 try: 273 ip = ipaddress.ip_address(x) 274 except ValueError: 275 x = x.encode("idna").decode() 276 ss.append(x509.DNSName(x)) 277 else: 278 ss.append(x509.IPAddress(ip)) 279 return x509.GeneralNames(ss) 280 else: 281 return x509.GeneralNames(sans) 282 283 284def dummy_cert( 285 privkey: rsa.RSAPrivateKey, 286 cacert: x509.Certificate, 287 commonname: str | None, 288 sans: Iterable[x509.GeneralName], 289 organization: str | None = None, 290) -> Cert: 291 """ 292 Generates a dummy certificate. 293 294 privkey: CA private key 295 cacert: CA certificate 296 commonname: Common name for the generated certificate. 297 sans: A list of Subject Alternate Names. 298 organization: Organization name for the generated certificate. 299 300 Returns cert if operation succeeded, None if not. 301 """ 302 builder = x509.CertificateBuilder() 303 builder = builder.issuer_name(cacert.subject) 304 builder = builder.add_extension( 305 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False 306 ) 307 builder = builder.public_key(cacert.public_key()) 308 309 now = datetime.datetime.now() 310 builder = builder.not_valid_before(now - datetime.timedelta(days=2)) 311 builder = builder.not_valid_after(now + CERT_EXPIRY) 312 313 subject = [] 314 is_valid_commonname = commonname is not None and len(commonname) < 64 315 if is_valid_commonname: 316 assert commonname is not None 317 subject.append(x509.NameAttribute(NameOID.COMMON_NAME, commonname)) 318 if organization is not None: 319 assert organization is not None 320 subject.append(x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization)) 321 builder = builder.subject_name(x509.Name(subject)) 322 builder = builder.serial_number(x509.random_serial_number()) 323 324 # RFC 5280 §4.2.1.6: subjectAltName is critical if subject is empty. 325 builder = builder.add_extension( 326 x509.SubjectAlternativeName(_fix_legacy_sans(sans)), 327 critical=not is_valid_commonname, 328 ) 329 330 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.1 331 builder = builder.add_extension( 332 x509.AuthorityKeyIdentifier.from_issuer_public_key(cacert.public_key()), 333 critical=False, 334 ) 335 # If CA and leaf cert have the same Subject Key Identifier, SChannel breaks in funny ways, 336 # see https://github.com/mitmproxy/mitmproxy/issues/6494. 337 # https://datatracker.ietf.org/doc/html/rfc5280#section-4.2.1.2 states 338 # that SKI is optional for the leaf cert, so we skip that. 339 340 cert = builder.sign(private_key=privkey, algorithm=hashes.SHA256()) # type: ignore 341 return Cert(cert) 342 343 344@dataclass(frozen=True) 345class CertStoreEntry: 346 cert: Cert 347 privatekey: rsa.RSAPrivateKey 348 chain_file: Path | None 349 chain_certs: list[Cert] 350 351 352TCustomCertId = str # manually provided certs (e.g. mitmproxy's --certs) 353TGeneratedCertId = tuple[Optional[str], x509.GeneralNames] # (common_name, sans) 354TCertId = Union[TCustomCertId, TGeneratedCertId] 355 356DHParams = NewType("DHParams", bytes) 357 358 359class CertStore: 360 """ 361 Implements an in-memory certificate store. 362 """ 363 364 STORE_CAP = 100 365 certs: dict[TCertId, CertStoreEntry] 366 expire_queue: list[CertStoreEntry] 367 368 def __init__( 369 self, 370 default_privatekey: rsa.RSAPrivateKey, 371 default_ca: Cert, 372 default_chain_file: Path | None, 373 dhparams: DHParams, 374 ): 375 self.default_privatekey = default_privatekey 376 self.default_ca = default_ca 377 self.default_chain_file = default_chain_file 378 self.default_chain_certs = ( 379 x509.load_pem_x509_certificates(self.default_chain_file.read_bytes()) 380 if self.default_chain_file 381 else [default_ca] 382 ) 383 self.dhparams = dhparams 384 self.certs = {} 385 self.expire_queue = [] 386 387 def expire(self, entry: CertStoreEntry) -> None: 388 self.expire_queue.append(entry) 389 if len(self.expire_queue) > self.STORE_CAP: 390 d = self.expire_queue.pop(0) 391 self.certs = {k: v for k, v in self.certs.items() if v != d} 392 393 @staticmethod 394 def load_dhparam(path: Path) -> DHParams: 395 # mitmproxy<=0.10 doesn't generate a dhparam file. 396 # Create it now if necessary. 397 if not path.exists(): 398 path.write_bytes(DEFAULT_DHPARAM) 399 400 # we could use cryptography for this, but it's unclear how to convert cryptography's object to pyOpenSSL's 401 # expected format. 402 bio = OpenSSL.SSL._lib.BIO_new_file( # type: ignore 403 str(path).encode(sys.getfilesystemencoding()), b"r" 404 ) 405 if bio != OpenSSL.SSL._ffi.NULL: # type: ignore 406 bio = OpenSSL.SSL._ffi.gc(bio, OpenSSL.SSL._lib.BIO_free) # type: ignore 407 dh = OpenSSL.SSL._lib.PEM_read_bio_DHparams( # type: ignore 408 bio, 409 OpenSSL.SSL._ffi.NULL, # type: ignore 410 OpenSSL.SSL._ffi.NULL, # type: ignore 411 OpenSSL.SSL._ffi.NULL, # type: ignore 412 ) 413 dh = OpenSSL.SSL._ffi.gc(dh, OpenSSL.SSL._lib.DH_free) # type: ignore 414 return dh 415 raise RuntimeError("Error loading DH Params.") # pragma: no cover 416 417 @classmethod 418 def from_store( 419 cls, 420 path: Path | str, 421 basename: str, 422 key_size: int, 423 passphrase: bytes | None = None, 424 ) -> "CertStore": 425 path = Path(path) 426 ca_file = path / f"{basename}-ca.pem" 427 dhparam_file = path / f"{basename}-dhparam.pem" 428 if not ca_file.exists(): 429 cls.create_store(path, basename, key_size) 430 return cls.from_files(ca_file, dhparam_file, passphrase) 431 432 @classmethod 433 def from_files( 434 cls, ca_file: Path, dhparam_file: Path, passphrase: bytes | None = None 435 ) -> "CertStore": 436 raw = ca_file.read_bytes() 437 key = load_pem_private_key(raw, passphrase) 438 dh = cls.load_dhparam(dhparam_file) 439 certs = x509.load_pem_x509_certificates(raw) 440 ca = Cert(certs[0]) 441 if len(certs) > 1: 442 chain_file: Path | None = ca_file 443 else: 444 chain_file = None 445 return cls(key, ca, chain_file, dh) 446 447 @staticmethod 448 @contextlib.contextmanager 449 def umask_secret(): 450 """ 451 Context to temporarily set umask to its original value bitor 0o77. 452 Useful when writing private keys to disk so that only the owner 453 will be able to read them. 454 """ 455 original_umask = os.umask(0) 456 os.umask(original_umask | 0o77) 457 try: 458 yield 459 finally: 460 os.umask(original_umask) 461 462 @staticmethod 463 def create_store( 464 path: Path, basename: str, key_size: int, organization=None, cn=None 465 ) -> None: 466 path.mkdir(parents=True, exist_ok=True) 467 468 organization = organization or basename 469 cn = cn or basename 470 471 key: rsa.RSAPrivateKeyWithSerialization 472 ca: x509.Certificate 473 key, ca = create_ca(organization=organization, cn=cn, key_size=key_size) 474 475 # Dump the CA plus private key. 476 with CertStore.umask_secret(): 477 # PEM format 478 (path / f"{basename}-ca.pem").write_bytes( 479 key.private_bytes( 480 encoding=serialization.Encoding.PEM, 481 format=serialization.PrivateFormat.TraditionalOpenSSL, 482 encryption_algorithm=serialization.NoEncryption(), 483 ) 484 + ca.public_bytes(serialization.Encoding.PEM) 485 ) 486 487 # PKCS12 format for Windows devices 488 (path / f"{basename}-ca.p12").write_bytes( 489 pkcs12.serialize_key_and_certificates( # type: ignore 490 name=basename.encode(), 491 key=key, 492 cert=ca, 493 cas=None, 494 encryption_algorithm=serialization.NoEncryption(), 495 ) 496 ) 497 498 # Dump the certificate in PEM format 499 pem_cert = ca.public_bytes(serialization.Encoding.PEM) 500 (path / f"{basename}-ca-cert.pem").write_bytes(pem_cert) 501 # Create a .cer file with the same contents for Android 502 (path / f"{basename}-ca-cert.cer").write_bytes(pem_cert) 503 504 # Dump the certificate in PKCS12 format for Windows devices 505 (path / f"{basename}-ca-cert.p12").write_bytes( 506 pkcs12.serialize_key_and_certificates( 507 name=basename.encode(), 508 key=None, # type: ignore 509 cert=ca, 510 cas=None, 511 encryption_algorithm=serialization.NoEncryption(), 512 ) 513 ) 514 515 (path / f"{basename}-dhparam.pem").write_bytes(DEFAULT_DHPARAM) 516 517 def add_cert_file( 518 self, spec: str, path: Path, passphrase: bytes | None = None 519 ) -> None: 520 raw = path.read_bytes() 521 cert = Cert.from_pem(raw) 522 try: 523 private_key = load_pem_private_key(raw, password=passphrase) 524 except ValueError as e: 525 private_key = self.default_privatekey 526 if cert.public_key() != private_key.public_key(): 527 raise ValueError( 528 f'Unable to find private key in "{path.absolute()}": {e}' 529 ) from e 530 else: 531 if cert.public_key() != private_key.public_key(): 532 raise ValueError( 533 f'Private and public keys in "{path.absolute()}" do not match:\n' 534 f"{cert.public_key()=}\n" 535 f"{private_key.public_key()=}" 536 ) 537 538 try: 539 chain = [Cert(x) for x in x509.load_pem_x509_certificates(raw)] 540 except ValueError as e: 541 logger.warning(f"Failed to read certificate chain: {e}") 542 chain = [cert] 543 544 if cert.is_ca: 545 logger.warning( 546 f'"{path.absolute()}" is a certificate authority and not a leaf certificate. ' 547 f"This indicates a misconfiguration, see https://docs.mitmproxy.org/stable/concepts-certificates/." 548 ) 549 550 self.add_cert(CertStoreEntry(cert, private_key, path, chain), spec) 551 552 def add_cert(self, entry: CertStoreEntry, *names: str) -> None: 553 """ 554 Adds a cert to the certstore. We register the CN in the cert plus 555 any SANs, and also the list of names provided as an argument. 556 """ 557 if entry.cert.cn: 558 self.certs[entry.cert.cn] = entry 559 for i in entry.cert.altnames: 560 self.certs[str(i.value)] = entry 561 for i in names: 562 self.certs[i] = entry 563 564 @staticmethod 565 def asterisk_forms(dn: str | x509.GeneralName) -> list[str]: 566 """ 567 Return all asterisk forms for a domain. For example, for www.example.com this will return 568 [b"www.example.com", b"*.example.com", b"*.com"]. The single wildcard "*" is omitted. 569 """ 570 if isinstance(dn, str): 571 parts = dn.split(".") 572 ret = [dn] 573 for i in range(1, len(parts)): 574 ret.append("*." + ".".join(parts[i:])) 575 return ret 576 elif isinstance(dn, x509.DNSName): 577 return CertStore.asterisk_forms(dn.value) 578 else: 579 return [str(dn.value)] 580 581 def get_cert( 582 self, 583 commonname: str | None, 584 sans: Iterable[x509.GeneralName], 585 organization: str | None = None, 586 ) -> CertStoreEntry: 587 """ 588 commonname: Common name for the generated certificate. Must be a 589 valid, plain-ASCII, IDNA-encoded domain name. 590 591 sans: A list of Subject Alternate Names. 592 593 organization: Organization name for the generated certificate. 594 """ 595 sans = _fix_legacy_sans(sans) 596 597 potential_keys: list[TCertId] = [] 598 if commonname: 599 potential_keys.extend(self.asterisk_forms(commonname)) 600 for s in sans: 601 potential_keys.extend(self.asterisk_forms(s)) 602 potential_keys.append("*") 603 potential_keys.append((commonname, sans)) 604 605 name = next(filter(lambda key: key in self.certs, potential_keys), None) 606 if name: 607 entry = self.certs[name] 608 else: 609 entry = CertStoreEntry( 610 cert=dummy_cert( 611 self.default_privatekey, 612 self.default_ca._cert, 613 commonname, 614 sans, 615 organization, 616 ), 617 privatekey=self.default_privatekey, 618 chain_file=self.default_chain_file, 619 chain_certs=self.default_chain_certs, 620 ) 621 self.certs[(commonname, sans)] = entry 622 self.expire(entry) 623 624 return entry 625 626 627def load_pem_private_key(data: bytes, password: bytes | None) -> rsa.RSAPrivateKey: 628 """ 629 like cryptography's load_pem_private_key, but silently falls back to not using a password 630 if the private key is unencrypted. 631 """ 632 try: 633 return serialization.load_pem_private_key(data, password) # type: ignore 634 except TypeError: 635 if password is not None: 636 return load_pem_private_key(data, None) 637 raise
class
Cert(mitmproxy.coretypes.serializable.Serializable):
55class Cert(serializable.Serializable): 56 """Representation of a (TLS) certificate.""" 57 58 _cert: x509.Certificate 59 60 def __init__(self, cert: x509.Certificate): 61 assert isinstance(cert, x509.Certificate) 62 self._cert = cert 63 64 def __eq__(self, other): 65 return self.fingerprint() == other.fingerprint() 66 67 def __repr__(self): 68 altnames = [str(x.value) for x in self.altnames] 69 return f"<Cert(cn={self.cn!r}, altnames={altnames!r})>" 70 71 def __hash__(self): 72 return self._cert.__hash__() 73 74 @classmethod 75 def from_state(cls, state): 76 return cls.from_pem(state) 77 78 def get_state(self): 79 return self.to_pem() 80 81 def set_state(self, state): 82 self._cert = x509.load_pem_x509_certificate(state) 83 84 @classmethod 85 def from_pem(cls, data: bytes) -> "Cert": 86 cert = x509.load_pem_x509_certificate(data) # type: ignore 87 return cls(cert) 88 89 def to_pem(self) -> bytes: 90 return self._cert.public_bytes(serialization.Encoding.PEM) 91 92 @classmethod 93 def from_pyopenssl(self, x509: OpenSSL.crypto.X509) -> "Cert": 94 return Cert(x509.to_cryptography()) 95 96 def to_pyopenssl(self) -> OpenSSL.crypto.X509: 97 return OpenSSL.crypto.X509.from_cryptography(self._cert) 98 99 def public_key(self) -> CertificatePublicKeyTypes: 100 return self._cert.public_key() 101 102 def fingerprint(self) -> bytes: 103 return self._cert.fingerprint(hashes.SHA256()) 104 105 @property 106 def issuer(self) -> list[tuple[str, str]]: 107 return _name_to_keyval(self._cert.issuer) 108 109 @property 110 def notbefore(self) -> datetime.datetime: 111 try: 112 # type definitions haven't caught up with new API yet. 113 return self._cert.not_valid_before_utc # type: ignore 114 except AttributeError: # pragma: no cover 115 # cryptography < 42.0 116 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) 117 118 @property 119 def notafter(self) -> datetime.datetime: 120 try: 121 return self._cert.not_valid_after_utc # type: ignore 122 except AttributeError: # pragma: no cover 123 return self._cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) 124 125 def has_expired(self) -> bool: 126 if sys.version_info < (3, 11): # pragma: no cover 127 return datetime.datetime.now(datetime.timezone.utc) > self.notafter 128 return datetime.datetime.now(datetime.UTC) > self.notafter 129 130 @property 131 def subject(self) -> list[tuple[str, str]]: 132 return _name_to_keyval(self._cert.subject) 133 134 @property 135 def serial(self) -> int: 136 return self._cert.serial_number 137 138 @property 139 def is_ca(self) -> bool: 140 constraints: x509.BasicConstraints 141 try: 142 constraints = self._cert.extensions.get_extension_for_class( 143 x509.BasicConstraints 144 ).value 145 return constraints.ca 146 except x509.ExtensionNotFound: 147 return False 148 149 @property 150 def keyinfo(self) -> tuple[str, int]: 151 public_key = self._cert.public_key() 152 if isinstance(public_key, rsa.RSAPublicKey): 153 return "RSA", public_key.key_size 154 if isinstance(public_key, dsa.DSAPublicKey): 155 return "DSA", public_key.key_size 156 if isinstance(public_key, ec.EllipticCurvePublicKey): 157 return f"EC ({public_key.curve.name})", public_key.key_size 158 return ( 159 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 160 getattr(public_key, "key_size", -1), 161 ) # pragma: no cover 162 163 @property 164 def cn(self) -> str | None: 165 attrs = self._cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) 166 if attrs: 167 return cast(str, attrs[0].value) 168 return None 169 170 @property 171 def organization(self) -> str | None: 172 attrs = self._cert.subject.get_attributes_for_oid( 173 x509.NameOID.ORGANIZATION_NAME 174 ) 175 if attrs: 176 return cast(str, attrs[0].value) 177 return None 178 179 @property 180 def altnames(self) -> x509.GeneralNames: 181 """ 182 Get all SubjectAlternativeName DNS altnames. 183 """ 184 try: 185 sans = self._cert.extensions.get_extension_for_class( 186 x509.SubjectAlternativeName 187 ).value 188 except x509.ExtensionNotFound: 189 return x509.GeneralNames([]) 190 else: 191 return x509.GeneralNames(sans)
Representation of a (TLS) certificate.
def
public_key( self) -> Union[cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey, cryptography.hazmat.primitives.asymmetric.x25519.X25519PublicKey, cryptography.hazmat.primitives.asymmetric.x448.X448PublicKey]:
notbefore: datetime.datetime
109 @property 110 def notbefore(self) -> datetime.datetime: 111 try: 112 # type definitions haven't caught up with new API yet. 113 return self._cert.not_valid_before_utc # type: ignore 114 except AttributeError: # pragma: no cover 115 # cryptography < 42.0 116 return self._cert.not_valid_before.replace(tzinfo=datetime.timezone.utc)
keyinfo: tuple[str, int]
149 @property 150 def keyinfo(self) -> tuple[str, int]: 151 public_key = self._cert.public_key() 152 if isinstance(public_key, rsa.RSAPublicKey): 153 return "RSA", public_key.key_size 154 if isinstance(public_key, dsa.DSAPublicKey): 155 return "DSA", public_key.key_size 156 if isinstance(public_key, ec.EllipticCurvePublicKey): 157 return f"EC ({public_key.curve.name})", public_key.key_size 158 return ( 159 public_key.__class__.__name__.replace("PublicKey", "").replace("_", ""), 160 getattr(public_key, "key_size", -1), 161 ) # pragma: no cover
altnames: cryptography.x509.extensions.GeneralNames
179 @property 180 def altnames(self) -> x509.GeneralNames: 181 """ 182 Get all SubjectAlternativeName DNS altnames. 183 """ 184 try: 185 sans = self._cert.extensions.get_extension_for_class( 186 x509.SubjectAlternativeName 187 ).value 188 except x509.ExtensionNotFound: 189 return x509.GeneralNames([]) 190 else: 191 return x509.GeneralNames(sans)
Get all SubjectAlternativeName DNS altnames.