mitmproxy.http
1import binascii 2import json 3import os 4import time 5import urllib.parse 6import warnings 7from collections.abc import Callable 8from collections.abc import Iterable 9from collections.abc import Iterator 10from collections.abc import Mapping 11from collections.abc import Sequence 12from dataclasses import dataclass 13from dataclasses import fields 14from email.utils import formatdate 15from email.utils import mktime_tz 16from email.utils import parsedate_tz 17from typing import Any 18from typing import cast 19 20from mitmproxy import flow 21from mitmproxy.coretypes import multidict 22from mitmproxy.coretypes import serializable 23from mitmproxy.net import encoding 24from mitmproxy.net.http import cookies 25from mitmproxy.net.http import multipart 26from mitmproxy.net.http import status_codes 27from mitmproxy.net.http import url 28from mitmproxy.net.http.headers import assemble_content_type 29from mitmproxy.net.http.headers import infer_content_encoding 30from mitmproxy.net.http.headers import parse_content_type 31from mitmproxy.utils import human 32from mitmproxy.utils import strutils 33from mitmproxy.utils import typecheck 34from mitmproxy.utils.strutils import always_bytes 35from mitmproxy.utils.strutils import always_str 36from mitmproxy.websocket import WebSocketData 37 38 39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. 40def _native(x: bytes) -> str: 41 return x.decode("utf-8", "surrogateescape") 42 43 44def _always_bytes(x: str | bytes) -> bytes: 45 return strutils.always_bytes(x, "utf-8", "surrogateescape") 46 47 48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types. 49class Headers(multidict.MultiDict): # type: ignore 50 """ 51 Header class which allows both convenient access to individual headers as well as 52 direct access to the underlying raw data. Provides a full dictionary interface. 53 54 Create headers with keyword arguments: 55 >>> h = Headers(host="example.com", content_type="application/xml") 56 57 Headers mostly behave like a normal dict: 58 >>> h["Host"] 59 "example.com" 60 61 Headers are case insensitive: 62 >>> h["host"] 63 "example.com" 64 65 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 66 >>> h = Headers([ 67 (b"Host",b"example.com"), 68 (b"Accept",b"text/html"), 69 (b"accept",b"application/xml") 70 ]) 71 72 Multiple headers are folded into a single header as per RFC 7230: 73 >>> h["Accept"] 74 "text/html, application/xml" 75 76 Setting a header removes all existing headers with the same name: 77 >>> h["Accept"] = "application/text" 78 >>> h["Accept"] 79 "application/text" 80 81 `bytes(h)` returns an HTTP/1 header block: 82 >>> print(bytes(h)) 83 Host: example.com 84 Accept: application/text 85 86 For full control, the raw header fields can be accessed: 87 >>> h.fields 88 89 Caveats: 90 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 91 """ 92 93 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 94 """ 95 *Args:* 96 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 97 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 98 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 99 For convenience, underscores in header names will be transformed to dashes - 100 this behaviour does not extend to other methods. 101 102 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 103 the behavior is undefined. 104 """ 105 super().__init__(fields) 106 107 for key, value in self.fields: 108 if not isinstance(key, bytes) or not isinstance(value, bytes): 109 raise TypeError("Header fields must be bytes.") 110 111 # content_type -> content-type 112 self.update( 113 { 114 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 115 for name, value in headers.items() 116 } 117 ) 118 119 fields: tuple[tuple[bytes, bytes], ...] 120 121 @staticmethod 122 def _reduce_values(values) -> str: 123 # Headers can be folded 124 return ", ".join(values) 125 126 @staticmethod 127 def _kconv(key) -> str: 128 # Headers are case-insensitive 129 return key.lower() 130 131 def __bytes__(self) -> bytes: 132 if self.fields: 133 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 134 else: 135 return b"" 136 137 def __delitem__(self, key: str | bytes) -> None: 138 key = _always_bytes(key) 139 super().__delitem__(key) 140 141 def __iter__(self) -> Iterator[str]: 142 for x in super().__iter__(): 143 yield _native(x) 144 145 def get_all(self, name: str | bytes) -> list[str]: 146 """ 147 Like `Headers.get`, but does not fold multiple headers into a single one. 148 This is useful for Set-Cookie and Cookie headers, which do not support folding. 149 150 *See also:* 151 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 152 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 153 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 154 """ 155 name = _always_bytes(name) 156 return [_native(x) for x in super().get_all(name)] 157 158 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 159 """ 160 Explicitly set multiple headers for the given key. 161 See `Headers.get_all`. 162 """ 163 name = _always_bytes(name) 164 values = [_always_bytes(x) for x in values] 165 return super().set_all(name, values) 166 167 def insert(self, index: int, key: str | bytes, value: str | bytes): 168 key = _always_bytes(key) 169 value = _always_bytes(value) 170 super().insert(index, key, value) 171 172 def items(self, multi=False): 173 if multi: 174 return ((_native(k), _native(v)) for k, v in self.fields) 175 else: 176 return super().items() 177 178 179@dataclass 180class MessageData(serializable.Serializable): 181 http_version: bytes 182 headers: Headers 183 content: bytes | None 184 trailers: Headers | None 185 timestamp_start: float 186 timestamp_end: float | None 187 188 # noinspection PyUnreachableCode 189 if __debug__: 190 191 def __post_init__(self): 192 for field in fields(self): 193 val = getattr(self, field.name) 194 typecheck.check_option_type(field.name, val, field.type) 195 196 def set_state(self, state): 197 for k, v in state.items(): 198 if k in ("headers", "trailers") and v is not None: 199 v = Headers.from_state(v) 200 setattr(self, k, v) 201 202 def get_state(self): 203 state = vars(self).copy() 204 state["headers"] = state["headers"].get_state() 205 if state["trailers"] is not None: 206 state["trailers"] = state["trailers"].get_state() 207 return state 208 209 @classmethod 210 def from_state(cls, state): 211 state["headers"] = Headers.from_state(state["headers"]) 212 if state["trailers"] is not None: 213 state["trailers"] = Headers.from_state(state["trailers"]) 214 return cls(**state) 215 216 217@dataclass 218class RequestData(MessageData): 219 host: str 220 port: int 221 method: bytes 222 scheme: bytes 223 authority: bytes 224 path: bytes 225 226 227@dataclass 228class ResponseData(MessageData): 229 status_code: int 230 reason: bytes 231 232 233class Message(serializable.Serializable): 234 """Base class for `Request` and `Response`.""" 235 236 @classmethod 237 def from_state(cls, state): 238 return cls(**state) 239 240 def get_state(self): 241 return self.data.get_state() 242 243 def set_state(self, state): 244 self.data.set_state(state) 245 246 data: MessageData 247 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 248 """ 249 This attribute controls if the message body should be streamed. 250 251 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 252 This makes it possible to perform string replacements on the entire body. 253 If `True`, the message body will not be buffered on the proxy 254 but immediately forwarded instead. 255 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 256 Please note that packet boundaries generally should not be relied upon. 257 258 This attribute must be set in the `requestheaders` or `responseheaders` hook. 259 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 260 """ 261 262 @property 263 def http_version(self) -> str: 264 """ 265 HTTP version string, for example `HTTP/1.1`. 266 """ 267 return self.data.http_version.decode("utf-8", "surrogateescape") 268 269 @http_version.setter 270 def http_version(self, http_version: str | bytes) -> None: 271 self.data.http_version = strutils.always_bytes( 272 http_version, "utf-8", "surrogateescape" 273 ) 274 275 @property 276 def is_http10(self) -> bool: 277 return self.data.http_version == b"HTTP/1.0" 278 279 @property 280 def is_http11(self) -> bool: 281 return self.data.http_version == b"HTTP/1.1" 282 283 @property 284 def is_http2(self) -> bool: 285 return self.data.http_version == b"HTTP/2.0" 286 287 @property 288 def is_http3(self) -> bool: 289 return self.data.http_version == b"HTTP/3" 290 291 @property 292 def headers(self) -> Headers: 293 """ 294 The HTTP headers. 295 """ 296 return self.data.headers 297 298 @headers.setter 299 def headers(self, h: Headers) -> None: 300 self.data.headers = h 301 302 @property 303 def trailers(self) -> Headers | None: 304 """ 305 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 306 """ 307 return self.data.trailers 308 309 @trailers.setter 310 def trailers(self, h: Headers | None) -> None: 311 self.data.trailers = h 312 313 @property 314 def raw_content(self) -> bytes | None: 315 """ 316 The raw (potentially compressed) HTTP message body. 317 318 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 319 `raw_content` may be `None` if the content is missing, for example due to body streaming 320 (see `Message.stream`). In contrast, `b""` signals a present but empty message body. 321 322 *See also:* `Message.content`, `Message.text` 323 """ 324 return self.data.content 325 326 @raw_content.setter 327 def raw_content(self, content: bytes | None) -> None: 328 self.data.content = content 329 330 @property 331 def content(self) -> bytes | None: 332 """ 333 The uncompressed HTTP message body as bytes. 334 335 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 336 337 *See also:* `Message.raw_content`, `Message.text` 338 """ 339 return self.get_content() 340 341 @content.setter 342 def content(self, value: bytes | None) -> None: 343 self.set_content(value) 344 345 @property 346 def text(self) -> str | None: 347 """ 348 The uncompressed and decoded HTTP message body as text. 349 350 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 351 352 *See also:* `Message.raw_content`, `Message.content` 353 """ 354 return self.get_text() 355 356 @text.setter 357 def text(self, value: str | None) -> None: 358 self.set_text(value) 359 360 def set_content(self, value: bytes | None) -> None: 361 if value is None: 362 self.raw_content = None 363 return 364 if not isinstance(value, bytes): 365 raise TypeError( 366 f"Message content must be bytes, not {type(value).__name__}. " 367 "Please use .text if you want to assign a str." 368 ) 369 ce = self.headers.get("content-encoding") 370 try: 371 self.raw_content = encoding.encode(value, ce or "identity") 372 except ValueError: 373 # So we have an invalid content-encoding? 374 # Let's remove it! 375 del self.headers["content-encoding"] 376 self.raw_content = value 377 378 if "transfer-encoding" in self.headers: 379 # https://httpwg.org/specs/rfc7230.html#header.content-length 380 # don't set content-length if a transfer-encoding is provided 381 pass 382 else: 383 self.headers["content-length"] = str(len(self.raw_content)) 384 385 def get_content(self, strict: bool = True) -> bytes | None: 386 """ 387 Similar to `Message.content`, but does not raise if `strict` is `False`. 388 Instead, the compressed message body is returned as-is. 389 """ 390 if self.raw_content is None: 391 return None 392 ce = self.headers.get("content-encoding") 393 if ce: 394 try: 395 content = encoding.decode(self.raw_content, ce) 396 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 397 if isinstance(content, str): 398 raise ValueError(f"Invalid Content-Encoding: {ce}") 399 return content 400 except ValueError: 401 if strict: 402 raise 403 return self.raw_content 404 else: 405 return self.raw_content 406 407 def set_text(self, text: str | None) -> None: 408 if text is None: 409 self.content = None 410 return 411 enc = infer_content_encoding(self.headers.get("content-type", "")) 412 413 try: 414 self.content = cast(bytes, encoding.encode(text, enc)) 415 except ValueError: 416 # Fall back to UTF-8 and update the content-type header. 417 ct = parse_content_type(self.headers.get("content-type", "")) or ( 418 "text", 419 "plain", 420 {}, 421 ) 422 ct[2]["charset"] = "utf-8" 423 self.headers["content-type"] = assemble_content_type(*ct) 424 enc = "utf8" 425 self.content = text.encode(enc, "surrogateescape") 426 427 def get_text(self, strict: bool = True) -> str | None: 428 """ 429 Similar to `Message.text`, but does not raise if `strict` is `False`. 430 Instead, the message body is returned as surrogate-escaped UTF-8. 431 """ 432 content = self.get_content(strict) 433 if content is None: 434 return None 435 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 436 try: 437 return cast(str, encoding.decode(content, enc)) 438 except ValueError: 439 if strict: 440 raise 441 return content.decode("utf8", "surrogateescape") 442 443 @property 444 def timestamp_start(self) -> float: 445 """ 446 *Timestamp:* Headers received. 447 """ 448 return self.data.timestamp_start 449 450 @timestamp_start.setter 451 def timestamp_start(self, timestamp_start: float) -> None: 452 self.data.timestamp_start = timestamp_start 453 454 @property 455 def timestamp_end(self) -> float | None: 456 """ 457 *Timestamp:* Last byte received. 458 """ 459 return self.data.timestamp_end 460 461 @timestamp_end.setter 462 def timestamp_end(self, timestamp_end: float | None): 463 self.data.timestamp_end = timestamp_end 464 465 def decode(self, strict: bool = True) -> None: 466 """ 467 Decodes body based on the current Content-Encoding header, then 468 removes the header. 469 470 If the message body is missing or empty, no action is taken. 471 472 *Raises:* 473 - `ValueError`, when the content-encoding is invalid and strict is True. 474 """ 475 if not self.raw_content: 476 # The body is missing (for example, because of body streaming or because it's a response 477 # to a HEAD request), so we can't correctly update content-length. 478 return 479 decoded = self.get_content(strict) 480 self.headers.pop("content-encoding", None) 481 self.content = decoded 482 483 def encode(self, encoding: str) -> None: 484 """ 485 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 486 Any existing content-encodings are overwritten, the content is not decoded beforehand. 487 488 *Raises:* 489 - `ValueError`, when the specified content-encoding is invalid. 490 """ 491 self.headers["content-encoding"] = encoding 492 self.content = self.raw_content 493 if "content-encoding" not in self.headers: 494 raise ValueError(f"Invalid content encoding {repr(encoding)}") 495 496 def json(self, **kwargs: Any) -> Any: 497 """ 498 Returns the JSON encoded content of the response, if any. 499 `**kwargs` are optional arguments that will be 500 passed to `json.loads()`. 501 502 Will raise if the content can not be decoded and then parsed as JSON. 503 504 *Raises:* 505 - `json.decoder.JSONDecodeError` if content is not valid JSON. 506 - `TypeError` if the content is not available, for example because the response 507 has been streamed. 508 """ 509 content = self.get_content(strict=False) 510 if content is None: 511 raise TypeError("Message content is not available.") 512 else: 513 return json.loads(content, **kwargs) 514 515 516class Request(Message): 517 """ 518 An HTTP request. 519 """ 520 521 data: RequestData 522 523 def __init__( 524 self, 525 host: str, 526 port: int, 527 method: bytes, 528 scheme: bytes, 529 authority: bytes, 530 path: bytes, 531 http_version: bytes, 532 headers: Headers | tuple[tuple[bytes, bytes], ...], 533 content: bytes | None, 534 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 535 timestamp_start: float, 536 timestamp_end: float | None, 537 ): 538 # auto-convert invalid types to retain compatibility with older code. 539 if isinstance(host, bytes): 540 host = host.decode("idna", "strict") 541 if isinstance(method, str): 542 method = method.encode("ascii", "strict") 543 if isinstance(scheme, str): 544 scheme = scheme.encode("ascii", "strict") 545 if isinstance(authority, str): 546 authority = authority.encode("ascii", "strict") 547 if isinstance(path, str): 548 path = path.encode("ascii", "strict") 549 if isinstance(http_version, str): 550 http_version = http_version.encode("ascii", "strict") 551 552 if isinstance(content, str): 553 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 554 if not isinstance(headers, Headers): 555 headers = Headers(headers) 556 if trailers is not None and not isinstance(trailers, Headers): 557 trailers = Headers(trailers) 558 559 self.data = RequestData( 560 host=host, 561 port=port, 562 method=method, 563 scheme=scheme, 564 authority=authority, 565 path=path, 566 http_version=http_version, 567 headers=headers, 568 content=content, 569 trailers=trailers, 570 timestamp_start=timestamp_start, 571 timestamp_end=timestamp_end, 572 ) 573 574 def __repr__(self) -> str: 575 if self.host and self.port: 576 hostport = f"{self.host}:{self.port}" 577 else: 578 hostport = "" 579 path = self.path or "" 580 return f"Request({self.method} {hostport}{path})" 581 582 @classmethod 583 def make( 584 cls, 585 method: str, 586 url: str, 587 content: bytes | str = "", 588 headers: ( 589 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 590 ) = (), 591 ) -> "Request": 592 """ 593 Simplified API for creating request objects. 594 """ 595 # Headers can be list or dict, we differentiate here. 596 if isinstance(headers, Headers): 597 pass 598 elif isinstance(headers, dict): 599 headers = Headers( 600 ( 601 always_bytes(k, "utf-8", "surrogateescape"), 602 always_bytes(v, "utf-8", "surrogateescape"), 603 ) 604 for k, v in headers.items() 605 ) 606 elif isinstance(headers, Iterable): 607 headers = Headers(headers) # type: ignore 608 else: 609 raise TypeError( 610 "Expected headers to be an iterable or dict, but is {}.".format( 611 type(headers).__name__ 612 ) 613 ) 614 615 req = cls( 616 "", 617 0, 618 method.encode("utf-8", "surrogateescape"), 619 b"", 620 b"", 621 b"", 622 b"HTTP/1.1", 623 headers, 624 b"", 625 None, 626 time.time(), 627 time.time(), 628 ) 629 630 req.url = url 631 # Assign this manually to update the content-length header. 632 if isinstance(content, bytes): 633 req.content = content 634 elif isinstance(content, str): 635 req.text = content 636 else: 637 raise TypeError( 638 f"Expected content to be str or bytes, but is {type(content).__name__}." 639 ) 640 641 return req 642 643 @property 644 def first_line_format(self) -> str: 645 """ 646 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 647 648 origin-form and asterisk-form are subsumed as "relative". 649 """ 650 if self.method == "CONNECT": 651 return "authority" 652 elif self.authority: 653 return "absolute" 654 else: 655 return "relative" 656 657 @property 658 def method(self) -> str: 659 """ 660 HTTP request method, e.g. "GET". 661 """ 662 return self.data.method.decode("utf-8", "surrogateescape").upper() 663 664 @method.setter 665 def method(self, val: str | bytes) -> None: 666 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 667 668 @property 669 def scheme(self) -> str: 670 """ 671 HTTP request scheme, which should be "http" or "https". 672 """ 673 return self.data.scheme.decode("utf-8", "surrogateescape") 674 675 @scheme.setter 676 def scheme(self, val: str | bytes) -> None: 677 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 678 679 @property 680 def authority(self) -> str: 681 """ 682 HTTP request authority. 683 684 For HTTP/1, this is the authority portion of the request target 685 (in either absolute-form or authority-form). 686 For origin-form and asterisk-form requests, this property is set to an empty string. 687 688 For HTTP/2, this is the :authority pseudo header. 689 690 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 691 """ 692 try: 693 return self.data.authority.decode("idna") 694 except UnicodeError: 695 return self.data.authority.decode("utf8", "surrogateescape") 696 697 @authority.setter 698 def authority(self, val: str | bytes) -> None: 699 if isinstance(val, str): 700 try: 701 val = val.encode("idna", "strict") 702 except UnicodeError: 703 val = val.encode("utf8", "surrogateescape") # type: ignore 704 self.data.authority = val 705 706 @property 707 def host(self) -> str: 708 """ 709 Target server for this request. This may be parsed from the raw request 710 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 711 or inferred from the proxy mode (e.g. an IP in transparent mode). 712 713 Setting the host attribute also updates the host header and authority information, if present. 714 715 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 716 """ 717 return self.data.host 718 719 @host.setter 720 def host(self, val: str | bytes) -> None: 721 self.data.host = always_str(val, "idna", "strict") 722 self._update_host_and_authority() 723 724 @property 725 def host_header(self) -> str | None: 726 """ 727 The request's host/authority header. 728 729 This property maps to either ``request.headers["Host"]`` or 730 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 731 732 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 733 """ 734 if self.is_http2 or self.is_http3: 735 return self.authority or self.data.headers.get("Host", None) 736 else: 737 return self.data.headers.get("Host", None) 738 739 @host_header.setter 740 def host_header(self, val: None | str | bytes) -> None: 741 if val is None: 742 if self.is_http2 or self.is_http3: 743 self.data.authority = b"" 744 self.headers.pop("Host", None) 745 else: 746 if self.is_http2 or self.is_http3: 747 self.authority = val # type: ignore 748 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 749 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 750 self.headers["Host"] = val 751 752 @property 753 def port(self) -> int: 754 """ 755 Target port. 756 """ 757 return self.data.port 758 759 @port.setter 760 def port(self, port: int) -> None: 761 if not isinstance(port, int): 762 raise ValueError(f"Port must be an integer, not {port!r}.") 763 764 self.data.port = port 765 self._update_host_and_authority() 766 767 def _update_host_and_authority(self) -> None: 768 val = url.hostport(self.scheme, self.host, self.port) 769 770 # Update host header 771 if "Host" in self.data.headers: 772 self.data.headers["Host"] = val 773 # Update authority 774 if self.data.authority: 775 self.authority = val 776 777 @property 778 def path(self) -> str: 779 """ 780 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 781 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 782 783 This attribute includes both path and query parts of the target URI 784 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 785 """ 786 return self.data.path.decode("utf-8", "surrogateescape") 787 788 @path.setter 789 def path(self, val: str | bytes) -> None: 790 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 791 792 @property 793 def url(self) -> str: 794 """ 795 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 796 797 Settings this property updates these attributes as well. 798 """ 799 if self.first_line_format == "authority": 800 return f"{self.host}:{self.port}" 801 path = self.path if self.path != "*" else "" 802 return url.unparse(self.scheme, self.host, self.port, path) 803 804 @url.setter 805 def url(self, val: str | bytes) -> None: 806 val = always_str(val, "utf-8", "surrogateescape") 807 self.scheme, self.host, self.port, self.path = url.parse(val) # type: ignore 808 809 @property 810 def pretty_host(self) -> str: 811 """ 812 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 813 This is useful in transparent mode where `Request.host` is only an IP address. 814 815 *Warning:* When working in adversarial environments, this may not reflect the actual destination 816 as the Host header could be spoofed. 817 """ 818 authority = self.host_header 819 if authority: 820 return url.parse_authority(authority, check=False)[0] 821 else: 822 return self.host 823 824 @property 825 def pretty_url(self) -> str: 826 """ 827 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 828 """ 829 if self.first_line_format == "authority": 830 return self.authority 831 832 host_header = self.host_header 833 if not host_header: 834 return self.url 835 836 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 837 pretty_port = pretty_port or url.default_port(self.scheme) or 443 838 path = self.path if self.path != "*" else "" 839 840 return url.unparse(self.scheme, pretty_host, pretty_port, path) 841 842 def _get_query(self): 843 query = urllib.parse.urlparse(self.url).query 844 return tuple(url.decode(query)) 845 846 def _set_query(self, query_data): 847 query = url.encode(query_data) 848 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 849 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 850 851 @property 852 def query(self) -> multidict.MultiDictView[str, str]: 853 """ 854 The request query as a mutable mapping view on the request's path. 855 For the most part, this behaves like a dictionary. 856 Modifications to the MultiDictView update `Request.path`, and vice versa. 857 """ 858 return multidict.MultiDictView(self._get_query, self._set_query) 859 860 @query.setter 861 def query(self, value): 862 self._set_query(value) 863 864 def _get_cookies(self): 865 h = self.headers.get_all("Cookie") 866 return tuple(cookies.parse_cookie_headers(h)) 867 868 def _set_cookies(self, value): 869 self.headers["cookie"] = cookies.format_cookie_header(value) 870 871 @property 872 def cookies(self) -> multidict.MultiDictView[str, str]: 873 """ 874 The request cookies. 875 For the most part, this behaves like a dictionary. 876 Modifications to the MultiDictView update `Request.headers`, and vice versa. 877 """ 878 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 879 880 @cookies.setter 881 def cookies(self, value): 882 self._set_cookies(value) 883 884 @property 885 def path_components(self) -> tuple[str, ...]: 886 """ 887 The URL's path components as a tuple of strings. 888 Components are unquoted. 889 """ 890 path = urllib.parse.urlparse(self.url).path 891 # This needs to be a tuple so that it's immutable. 892 # Otherwise, this would fail silently: 893 # request.path_components.append("foo") 894 return tuple(url.unquote(i) for i in path.split("/") if i) 895 896 @path_components.setter 897 def path_components(self, components: Iterable[str]): 898 components = map(lambda x: url.quote(x, safe=""), components) 899 path = "/" + "/".join(components) 900 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 901 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 902 903 def anticache(self) -> None: 904 """ 905 Modifies this request to remove headers that might produce a cached response. 906 """ 907 delheaders = ( 908 "if-modified-since", 909 "if-none-match", 910 ) 911 for i in delheaders: 912 self.headers.pop(i, None) 913 914 def anticomp(self) -> None: 915 """ 916 Modify the Accept-Encoding header to only accept uncompressed responses. 917 """ 918 self.headers["accept-encoding"] = "identity" 919 920 def constrain_encoding(self) -> None: 921 """ 922 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 923 """ 924 accept_encoding = self.headers.get("accept-encoding") 925 if accept_encoding: 926 self.headers["accept-encoding"] = ", ".join( 927 e 928 for e in {"gzip", "identity", "deflate", "br", "zstd"} 929 if e in accept_encoding 930 ) 931 932 def _get_urlencoded_form(self): 933 is_valid_content_type = ( 934 "application/x-www-form-urlencoded" 935 in self.headers.get("content-type", "").lower() 936 ) 937 if is_valid_content_type: 938 return tuple(url.decode(self.get_text(strict=False))) 939 return () 940 941 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 942 """ 943 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 944 This will overwrite the existing content if there is one. 945 """ 946 self.headers["content-type"] = "application/x-www-form-urlencoded" 947 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 948 949 @property 950 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 951 """ 952 The URL-encoded form data. 953 954 If the content-type indicates non-form data or the form could not be parsed, this is set to 955 an empty `MultiDictView`. 956 957 Modifications to the MultiDictView update `Request.content`, and vice versa. 958 """ 959 return multidict.MultiDictView( 960 self._get_urlencoded_form, self._set_urlencoded_form 961 ) 962 963 @urlencoded_form.setter 964 def urlencoded_form(self, value): 965 self._set_urlencoded_form(value) 966 967 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 968 is_valid_content_type = ( 969 "multipart/form-data" in self.headers.get("content-type", "").lower() 970 ) 971 if is_valid_content_type and self.content is not None: 972 try: 973 return multipart.decode_multipart( 974 self.headers.get("content-type"), self.content 975 ) 976 except ValueError: 977 pass 978 return [] 979 980 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 981 ct = self.headers.get("content-type", "") 982 is_valid_content_type = ct.lower().startswith("multipart/form-data") 983 if not is_valid_content_type: 984 """ 985 Generate a random boundary here. 986 987 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 988 on generating the boundary. 989 """ 990 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 991 self.headers["content-type"] = ct = ( 992 f"multipart/form-data; boundary={boundary}" 993 ) 994 self.content = multipart.encode_multipart(ct, value) 995 996 @property 997 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 998 """ 999 The multipart form data. 1000 1001 If the content-type indicates non-form data or the form could not be parsed, this is set to 1002 an empty `MultiDictView`. 1003 1004 Modifications to the MultiDictView update `Request.content`, and vice versa. 1005 """ 1006 return multidict.MultiDictView( 1007 self._get_multipart_form, self._set_multipart_form 1008 ) 1009 1010 @multipart_form.setter 1011 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1012 self._set_multipart_form(value) 1013 1014 1015class Response(Message): 1016 """ 1017 An HTTP response. 1018 """ 1019 1020 data: ResponseData 1021 1022 def __init__( 1023 self, 1024 http_version: bytes, 1025 status_code: int, 1026 reason: bytes, 1027 headers: Headers | tuple[tuple[bytes, bytes], ...], 1028 content: bytes | None, 1029 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1030 timestamp_start: float, 1031 timestamp_end: float | None, 1032 ): 1033 # auto-convert invalid types to retain compatibility with older code. 1034 if isinstance(http_version, str): 1035 http_version = http_version.encode("ascii", "strict") 1036 if isinstance(reason, str): 1037 reason = reason.encode("ascii", "strict") 1038 1039 if isinstance(content, str): 1040 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1041 if not isinstance(headers, Headers): 1042 headers = Headers(headers) 1043 if trailers is not None and not isinstance(trailers, Headers): 1044 trailers = Headers(trailers) 1045 1046 self.data = ResponseData( 1047 http_version=http_version, 1048 status_code=status_code, 1049 reason=reason, 1050 headers=headers, 1051 content=content, 1052 trailers=trailers, 1053 timestamp_start=timestamp_start, 1054 timestamp_end=timestamp_end, 1055 ) 1056 1057 def __repr__(self) -> str: 1058 if self.raw_content: 1059 ct = self.headers.get("content-type", "unknown content type") 1060 size = human.pretty_size(len(self.raw_content)) 1061 details = f"{ct}, {size}" 1062 else: 1063 details = "no content" 1064 return f"Response({self.status_code}, {details})" 1065 1066 @classmethod 1067 def make( 1068 cls, 1069 status_code: int = 200, 1070 content: bytes | str = b"", 1071 headers: ( 1072 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1073 ) = (), 1074 ) -> "Response": 1075 """ 1076 Simplified API for creating response objects. 1077 """ 1078 if isinstance(headers, Headers): 1079 headers = headers 1080 elif isinstance(headers, dict): 1081 headers = Headers( 1082 ( 1083 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1084 always_bytes(v, "utf-8", "surrogateescape"), 1085 ) 1086 for k, v in headers.items() 1087 ) 1088 elif isinstance(headers, Iterable): 1089 headers = Headers(headers) # type: ignore 1090 else: 1091 raise TypeError( 1092 "Expected headers to be an iterable or dict, but is {}.".format( 1093 type(headers).__name__ 1094 ) 1095 ) 1096 1097 resp = cls( 1098 b"HTTP/1.1", 1099 status_code, 1100 status_codes.RESPONSES.get(status_code, "").encode(), 1101 headers, 1102 None, 1103 None, 1104 time.time(), 1105 time.time(), 1106 ) 1107 1108 # Assign this manually to update the content-length header. 1109 if isinstance(content, bytes): 1110 resp.content = content 1111 elif isinstance(content, str): 1112 resp.text = content 1113 else: 1114 raise TypeError( 1115 f"Expected content to be str or bytes, but is {type(content).__name__}." 1116 ) 1117 1118 return resp 1119 1120 @property 1121 def status_code(self) -> int: 1122 """ 1123 HTTP Status Code, e.g. ``200``. 1124 """ 1125 return self.data.status_code 1126 1127 @status_code.setter 1128 def status_code(self, status_code: int) -> None: 1129 self.data.status_code = status_code 1130 1131 @property 1132 def reason(self) -> str: 1133 """ 1134 HTTP reason phrase, for example "Not Found". 1135 1136 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1137 """ 1138 # Encoding: http://stackoverflow.com/a/16674906/934719 1139 return self.data.reason.decode("ISO-8859-1") 1140 1141 @reason.setter 1142 def reason(self, reason: str | bytes) -> None: 1143 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1144 1145 def _get_cookies(self): 1146 h = self.headers.get_all("set-cookie") 1147 all_cookies = cookies.parse_set_cookie_headers(h) 1148 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1149 1150 def _set_cookies(self, value): 1151 cookie_headers = [] 1152 for k, v in value: 1153 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1154 cookie_headers.append(header) 1155 self.headers.set_all("set-cookie", cookie_headers) 1156 1157 @property 1158 def cookies( 1159 self, 1160 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1161 """ 1162 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1163 name strings, and values are `(cookie value, attributes)` tuples. Within 1164 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1165 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1166 1167 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1168 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1169 """ 1170 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1171 1172 @cookies.setter 1173 def cookies(self, value): 1174 self._set_cookies(value) 1175 1176 def refresh(self, now=None): 1177 """ 1178 This fairly complex and heuristic function refreshes a server 1179 response for replay. 1180 1181 - It adjusts date, expires, and last-modified headers. 1182 - It adjusts cookie expiration. 1183 """ 1184 if not now: 1185 now = time.time() 1186 delta = now - self.timestamp_start 1187 refresh_headers = [ 1188 "date", 1189 "expires", 1190 "last-modified", 1191 ] 1192 for i in refresh_headers: 1193 if i in self.headers: 1194 d = parsedate_tz(self.headers[i]) 1195 if d: 1196 new = mktime_tz(d) + delta 1197 try: 1198 self.headers[i] = formatdate(new, usegmt=True) 1199 except OSError: # pragma: no cover 1200 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1201 c = [] 1202 for set_cookie_header in self.headers.get_all("set-cookie"): 1203 try: 1204 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1205 except ValueError: 1206 refreshed = set_cookie_header 1207 c.append(refreshed) 1208 if c: 1209 self.headers.set_all("set-cookie", c) 1210 1211 1212class HTTPFlow(flow.Flow): 1213 """ 1214 An HTTPFlow is a collection of objects representing a single HTTP 1215 transaction. 1216 """ 1217 1218 request: Request 1219 """The client's HTTP request.""" 1220 response: Response | None = None 1221 """The server's HTTP response.""" 1222 error: flow.Error | None = None 1223 """ 1224 A connection or protocol error affecting this flow. 1225 1226 Note that it's possible for a Flow to have both a response and an error 1227 object. This might happen, for instance, when a response was received 1228 from the server, but there was an error sending it back to the client. 1229 """ 1230 1231 websocket: WebSocketData | None = None 1232 """ 1233 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1234 """ 1235 1236 def get_state(self) -> serializable.State: 1237 return { 1238 **super().get_state(), 1239 "request": self.request.get_state(), 1240 "response": self.response.get_state() if self.response else None, 1241 "websocket": self.websocket.get_state() if self.websocket else None, 1242 } 1243 1244 def set_state(self, state: serializable.State) -> None: 1245 self.request = Request.from_state(state.pop("request")) 1246 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1247 self.websocket = ( 1248 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1249 ) 1250 super().set_state(state) 1251 1252 def __repr__(self): 1253 s = "<HTTPFlow" 1254 for a in ( 1255 "request", 1256 "response", 1257 "websocket", 1258 "error", 1259 "client_conn", 1260 "server_conn", 1261 ): 1262 if getattr(self, a, False): 1263 s += f"\r\n {a} = {{flow.{a}}}" 1264 s += ">" 1265 return s.format(flow=self) 1266 1267 @property 1268 def timestamp_start(self) -> float: 1269 """*Read-only:* An alias for `Request.timestamp_start`.""" 1270 return self.request.timestamp_start 1271 1272 @property 1273 def mode(self) -> str: # pragma: no cover 1274 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1275 return getattr(self, "_mode", "regular") 1276 1277 @mode.setter 1278 def mode(self, val: str) -> None: # pragma: no cover 1279 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1280 self._mode = val 1281 1282 def copy(self): 1283 f = super().copy() 1284 if self.request: 1285 f.request = self.request.copy() 1286 if self.response: 1287 f.response = self.response.copy() 1288 return f 1289 1290 1291__all__ = [ 1292 "HTTPFlow", 1293 "Message", 1294 "Request", 1295 "Response", 1296 "Headers", 1297]
1213class HTTPFlow(flow.Flow): 1214 """ 1215 An HTTPFlow is a collection of objects representing a single HTTP 1216 transaction. 1217 """ 1218 1219 request: Request 1220 """The client's HTTP request.""" 1221 response: Response | None = None 1222 """The server's HTTP response.""" 1223 error: flow.Error | None = None 1224 """ 1225 A connection or protocol error affecting this flow. 1226 1227 Note that it's possible for a Flow to have both a response and an error 1228 object. This might happen, for instance, when a response was received 1229 from the server, but there was an error sending it back to the client. 1230 """ 1231 1232 websocket: WebSocketData | None = None 1233 """ 1234 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1235 """ 1236 1237 def get_state(self) -> serializable.State: 1238 return { 1239 **super().get_state(), 1240 "request": self.request.get_state(), 1241 "response": self.response.get_state() if self.response else None, 1242 "websocket": self.websocket.get_state() if self.websocket else None, 1243 } 1244 1245 def set_state(self, state: serializable.State) -> None: 1246 self.request = Request.from_state(state.pop("request")) 1247 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1248 self.websocket = ( 1249 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1250 ) 1251 super().set_state(state) 1252 1253 def __repr__(self): 1254 s = "<HTTPFlow" 1255 for a in ( 1256 "request", 1257 "response", 1258 "websocket", 1259 "error", 1260 "client_conn", 1261 "server_conn", 1262 ): 1263 if getattr(self, a, False): 1264 s += f"\r\n {a} = {{flow.{a}}}" 1265 s += ">" 1266 return s.format(flow=self) 1267 1268 @property 1269 def timestamp_start(self) -> float: 1270 """*Read-only:* An alias for `Request.timestamp_start`.""" 1271 return self.request.timestamp_start 1272 1273 @property 1274 def mode(self) -> str: # pragma: no cover 1275 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1276 return getattr(self, "_mode", "regular") 1277 1278 @mode.setter 1279 def mode(self, val: str) -> None: # pragma: no cover 1280 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1281 self._mode = val 1282 1283 def copy(self): 1284 f = super().copy() 1285 if self.request: 1286 f.request = self.request.copy() 1287 if self.response: 1288 f.response = self.response.copy() 1289 return f
An HTTPFlow is a collection of objects representing a single HTTP transaction.
A connection or protocol error affecting this flow.
Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.
If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1268 @property 1269 def timestamp_start(self) -> float: 1270 """*Read-only:* An alias for `Request.timestamp_start`.""" 1271 return self.request.timestamp_start
Read-only: An alias for Request.timestamp_start
.
234class Message(serializable.Serializable): 235 """Base class for `Request` and `Response`.""" 236 237 @classmethod 238 def from_state(cls, state): 239 return cls(**state) 240 241 def get_state(self): 242 return self.data.get_state() 243 244 def set_state(self, state): 245 self.data.set_state(state) 246 247 data: MessageData 248 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 249 """ 250 This attribute controls if the message body should be streamed. 251 252 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 253 This makes it possible to perform string replacements on the entire body. 254 If `True`, the message body will not be buffered on the proxy 255 but immediately forwarded instead. 256 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 257 Please note that packet boundaries generally should not be relied upon. 258 259 This attribute must be set in the `requestheaders` or `responseheaders` hook. 260 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 261 """ 262 263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape") 269 270 @http_version.setter 271 def http_version(self, http_version: str | bytes) -> None: 272 self.data.http_version = strutils.always_bytes( 273 http_version, "utf-8", "surrogateescape" 274 ) 275 276 @property 277 def is_http10(self) -> bool: 278 return self.data.http_version == b"HTTP/1.0" 279 280 @property 281 def is_http11(self) -> bool: 282 return self.data.http_version == b"HTTP/1.1" 283 284 @property 285 def is_http2(self) -> bool: 286 return self.data.http_version == b"HTTP/2.0" 287 288 @property 289 def is_http3(self) -> bool: 290 return self.data.http_version == b"HTTP/3" 291 292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers 298 299 @headers.setter 300 def headers(self, h: Headers) -> None: 301 self.data.headers = h 302 303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers 309 310 @trailers.setter 311 def trailers(self, h: Headers | None) -> None: 312 self.data.trailers = h 313 314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 `raw_content` may be `None` if the content is missing, for example due to body streaming 321 (see `Message.stream`). In contrast, `b""` signals a present but empty message body. 322 323 *See also:* `Message.content`, `Message.text` 324 """ 325 return self.data.content 326 327 @raw_content.setter 328 def raw_content(self, content: bytes | None) -> None: 329 self.data.content = content 330 331 @property 332 def content(self) -> bytes | None: 333 """ 334 The uncompressed HTTP message body as bytes. 335 336 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 337 338 *See also:* `Message.raw_content`, `Message.text` 339 """ 340 return self.get_content() 341 342 @content.setter 343 def content(self, value: bytes | None) -> None: 344 self.set_content(value) 345 346 @property 347 def text(self) -> str | None: 348 """ 349 The uncompressed and decoded HTTP message body as text. 350 351 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 352 353 *See also:* `Message.raw_content`, `Message.content` 354 """ 355 return self.get_text() 356 357 @text.setter 358 def text(self, value: str | None) -> None: 359 self.set_text(value) 360 361 def set_content(self, value: bytes | None) -> None: 362 if value is None: 363 self.raw_content = None 364 return 365 if not isinstance(value, bytes): 366 raise TypeError( 367 f"Message content must be bytes, not {type(value).__name__}. " 368 "Please use .text if you want to assign a str." 369 ) 370 ce = self.headers.get("content-encoding") 371 try: 372 self.raw_content = encoding.encode(value, ce or "identity") 373 except ValueError: 374 # So we have an invalid content-encoding? 375 # Let's remove it! 376 del self.headers["content-encoding"] 377 self.raw_content = value 378 379 if "transfer-encoding" in self.headers: 380 # https://httpwg.org/specs/rfc7230.html#header.content-length 381 # don't set content-length if a transfer-encoding is provided 382 pass 383 else: 384 self.headers["content-length"] = str(len(self.raw_content)) 385 386 def get_content(self, strict: bool = True) -> bytes | None: 387 """ 388 Similar to `Message.content`, but does not raise if `strict` is `False`. 389 Instead, the compressed message body is returned as-is. 390 """ 391 if self.raw_content is None: 392 return None 393 ce = self.headers.get("content-encoding") 394 if ce: 395 try: 396 content = encoding.decode(self.raw_content, ce) 397 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 398 if isinstance(content, str): 399 raise ValueError(f"Invalid Content-Encoding: {ce}") 400 return content 401 except ValueError: 402 if strict: 403 raise 404 return self.raw_content 405 else: 406 return self.raw_content 407 408 def set_text(self, text: str | None) -> None: 409 if text is None: 410 self.content = None 411 return 412 enc = infer_content_encoding(self.headers.get("content-type", "")) 413 414 try: 415 self.content = cast(bytes, encoding.encode(text, enc)) 416 except ValueError: 417 # Fall back to UTF-8 and update the content-type header. 418 ct = parse_content_type(self.headers.get("content-type", "")) or ( 419 "text", 420 "plain", 421 {}, 422 ) 423 ct[2]["charset"] = "utf-8" 424 self.headers["content-type"] = assemble_content_type(*ct) 425 enc = "utf8" 426 self.content = text.encode(enc, "surrogateescape") 427 428 def get_text(self, strict: bool = True) -> str | None: 429 """ 430 Similar to `Message.text`, but does not raise if `strict` is `False`. 431 Instead, the message body is returned as surrogate-escaped UTF-8. 432 """ 433 content = self.get_content(strict) 434 if content is None: 435 return None 436 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 437 try: 438 return cast(str, encoding.decode(content, enc)) 439 except ValueError: 440 if strict: 441 raise 442 return content.decode("utf8", "surrogateescape") 443 444 @property 445 def timestamp_start(self) -> float: 446 """ 447 *Timestamp:* Headers received. 448 """ 449 return self.data.timestamp_start 450 451 @timestamp_start.setter 452 def timestamp_start(self, timestamp_start: float) -> None: 453 self.data.timestamp_start = timestamp_start 454 455 @property 456 def timestamp_end(self) -> float | None: 457 """ 458 *Timestamp:* Last byte received. 459 """ 460 return self.data.timestamp_end 461 462 @timestamp_end.setter 463 def timestamp_end(self, timestamp_end: float | None): 464 self.data.timestamp_end = timestamp_end 465 466 def decode(self, strict: bool = True) -> None: 467 """ 468 Decodes body based on the current Content-Encoding header, then 469 removes the header. 470 471 If the message body is missing or empty, no action is taken. 472 473 *Raises:* 474 - `ValueError`, when the content-encoding is invalid and strict is True. 475 """ 476 if not self.raw_content: 477 # The body is missing (for example, because of body streaming or because it's a response 478 # to a HEAD request), so we can't correctly update content-length. 479 return 480 decoded = self.get_content(strict) 481 self.headers.pop("content-encoding", None) 482 self.content = decoded 483 484 def encode(self, encoding: str) -> None: 485 """ 486 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 487 Any existing content-encodings are overwritten, the content is not decoded beforehand. 488 489 *Raises:* 490 - `ValueError`, when the specified content-encoding is invalid. 491 """ 492 self.headers["content-encoding"] = encoding 493 self.content = self.raw_content 494 if "content-encoding" not in self.headers: 495 raise ValueError(f"Invalid content encoding {repr(encoding)}") 496 497 def json(self, **kwargs: Any) -> Any: 498 """ 499 Returns the JSON encoded content of the response, if any. 500 `**kwargs` are optional arguments that will be 501 passed to `json.loads()`. 502 503 Will raise if the content can not be decoded and then parsed as JSON. 504 505 *Raises:* 506 - `json.decoder.JSONDecodeError` if content is not valid JSON. 507 - `TypeError` if the content is not available, for example because the response 508 has been streamed. 509 """ 510 content = self.get_content(strict=False) 511 if content is None: 512 raise TypeError("Message content is not available.") 513 else: 514 return json.loads(content, **kwargs)
This attribute controls if the message body should be streamed.
If False
, mitmproxy will buffer the entire body before forwarding it to the destination.
This makes it possible to perform string replacements on the entire body.
If True
, the message body will not be buffered on the proxy
but immediately forwarded instead.
Alternatively, a transformation function can be specified, which will be called for each chunk of data.
Please note that packet boundaries generally should not be relied upon.
This attribute must be set in the requestheaders
or responseheaders
hook.
Setting it in request
or response
is already too late, mitmproxy has buffered the message body already.
263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape")
HTTP version string, for example HTTP/1.1
.
292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers
The HTTP headers.
303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers
The HTTP trailers.
314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 `raw_content` may be `None` if the content is missing, for example due to body streaming 321 (see `Message.stream`). In contrast, `b""` signals a present but empty message body. 322 323 *See also:* `Message.content`, `Message.text` 324 """ 325 return self.data.content
The raw (potentially compressed) HTTP message body.
In contrast to Message.content
and Message.text
, accessing this property never raises.
raw_content
may be None
if the content is missing, for example due to body streaming
(see Message.stream
). In contrast, b""
signals a present but empty message body.
See also: Message.content
, Message.text
331 @property 332 def content(self) -> bytes | None: 333 """ 334 The uncompressed HTTP message body as bytes. 335 336 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 337 338 *See also:* `Message.raw_content`, `Message.text` 339 """ 340 return self.get_content()
The uncompressed HTTP message body as bytes.
Accessing this attribute may raise a ValueError
when the HTTP content-encoding is invalid.
See also: Message.raw_content
, Message.text
346 @property 347 def text(self) -> str | None: 348 """ 349 The uncompressed and decoded HTTP message body as text. 350 351 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 352 353 *See also:* `Message.raw_content`, `Message.content` 354 """ 355 return self.get_text()
The uncompressed and decoded HTTP message body as text.
Accessing this attribute may raise a ValueError
when either content-encoding or charset is invalid.
See also: Message.raw_content
, Message.content
361 def set_content(self, value: bytes | None) -> None: 362 if value is None: 363 self.raw_content = None 364 return 365 if not isinstance(value, bytes): 366 raise TypeError( 367 f"Message content must be bytes, not {type(value).__name__}. " 368 "Please use .text if you want to assign a str." 369 ) 370 ce = self.headers.get("content-encoding") 371 try: 372 self.raw_content = encoding.encode(value, ce or "identity") 373 except ValueError: 374 # So we have an invalid content-encoding? 375 # Let's remove it! 376 del self.headers["content-encoding"] 377 self.raw_content = value 378 379 if "transfer-encoding" in self.headers: 380 # https://httpwg.org/specs/rfc7230.html#header.content-length 381 # don't set content-length if a transfer-encoding is provided 382 pass 383 else: 384 self.headers["content-length"] = str(len(self.raw_content))
386 def get_content(self, strict: bool = True) -> bytes | None: 387 """ 388 Similar to `Message.content`, but does not raise if `strict` is `False`. 389 Instead, the compressed message body is returned as-is. 390 """ 391 if self.raw_content is None: 392 return None 393 ce = self.headers.get("content-encoding") 394 if ce: 395 try: 396 content = encoding.decode(self.raw_content, ce) 397 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 398 if isinstance(content, str): 399 raise ValueError(f"Invalid Content-Encoding: {ce}") 400 return content 401 except ValueError: 402 if strict: 403 raise 404 return self.raw_content 405 else: 406 return self.raw_content
Similar to Message.content
, but does not raise if strict
is False
.
Instead, the compressed message body is returned as-is.
408 def set_text(self, text: str | None) -> None: 409 if text is None: 410 self.content = None 411 return 412 enc = infer_content_encoding(self.headers.get("content-type", "")) 413 414 try: 415 self.content = cast(bytes, encoding.encode(text, enc)) 416 except ValueError: 417 # Fall back to UTF-8 and update the content-type header. 418 ct = parse_content_type(self.headers.get("content-type", "")) or ( 419 "text", 420 "plain", 421 {}, 422 ) 423 ct[2]["charset"] = "utf-8" 424 self.headers["content-type"] = assemble_content_type(*ct) 425 enc = "utf8" 426 self.content = text.encode(enc, "surrogateescape")
428 def get_text(self, strict: bool = True) -> str | None: 429 """ 430 Similar to `Message.text`, but does not raise if `strict` is `False`. 431 Instead, the message body is returned as surrogate-escaped UTF-8. 432 """ 433 content = self.get_content(strict) 434 if content is None: 435 return None 436 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 437 try: 438 return cast(str, encoding.decode(content, enc)) 439 except ValueError: 440 if strict: 441 raise 442 return content.decode("utf8", "surrogateescape")
Similar to Message.text
, but does not raise if strict
is False
.
Instead, the message body is returned as surrogate-escaped UTF-8.
444 @property 445 def timestamp_start(self) -> float: 446 """ 447 *Timestamp:* Headers received. 448 """ 449 return self.data.timestamp_start
Timestamp: Headers received.
455 @property 456 def timestamp_end(self) -> float | None: 457 """ 458 *Timestamp:* Last byte received. 459 """ 460 return self.data.timestamp_end
Timestamp: Last byte received.
466 def decode(self, strict: bool = True) -> None: 467 """ 468 Decodes body based on the current Content-Encoding header, then 469 removes the header. 470 471 If the message body is missing or empty, no action is taken. 472 473 *Raises:* 474 - `ValueError`, when the content-encoding is invalid and strict is True. 475 """ 476 if not self.raw_content: 477 # The body is missing (for example, because of body streaming or because it's a response 478 # to a HEAD request), so we can't correctly update content-length. 479 return 480 decoded = self.get_content(strict) 481 self.headers.pop("content-encoding", None) 482 self.content = decoded
Decodes body based on the current Content-Encoding header, then removes the header.
If the message body is missing or empty, no action is taken.
Raises:
ValueError
, when the content-encoding is invalid and strict is True.
484 def encode(self, encoding: str) -> None: 485 """ 486 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 487 Any existing content-encodings are overwritten, the content is not decoded beforehand. 488 489 *Raises:* 490 - `ValueError`, when the specified content-encoding is invalid. 491 """ 492 self.headers["content-encoding"] = encoding 493 self.content = self.raw_content 494 if "content-encoding" not in self.headers: 495 raise ValueError(f"Invalid content encoding {repr(encoding)}")
Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.
Raises:
ValueError
, when the specified content-encoding is invalid.
497 def json(self, **kwargs: Any) -> Any: 498 """ 499 Returns the JSON encoded content of the response, if any. 500 `**kwargs` are optional arguments that will be 501 passed to `json.loads()`. 502 503 Will raise if the content can not be decoded and then parsed as JSON. 504 505 *Raises:* 506 - `json.decoder.JSONDecodeError` if content is not valid JSON. 507 - `TypeError` if the content is not available, for example because the response 508 has been streamed. 509 """ 510 content = self.get_content(strict=False) 511 if content is None: 512 raise TypeError("Message content is not available.") 513 else: 514 return json.loads(content, **kwargs)
Returns the JSON encoded content of the response, if any.
**kwargs
are optional arguments that will be
passed to json.loads()
.
Will raise if the content can not be decoded and then parsed as JSON.
Raises:
json.decoder.JSONDecodeError
if content is not valid JSON.TypeError
if the content is not available, for example because the response has been streamed.
517class Request(Message): 518 """ 519 An HTTP request. 520 """ 521 522 data: RequestData 523 524 def __init__( 525 self, 526 host: str, 527 port: int, 528 method: bytes, 529 scheme: bytes, 530 authority: bytes, 531 path: bytes, 532 http_version: bytes, 533 headers: Headers | tuple[tuple[bytes, bytes], ...], 534 content: bytes | None, 535 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 536 timestamp_start: float, 537 timestamp_end: float | None, 538 ): 539 # auto-convert invalid types to retain compatibility with older code. 540 if isinstance(host, bytes): 541 host = host.decode("idna", "strict") 542 if isinstance(method, str): 543 method = method.encode("ascii", "strict") 544 if isinstance(scheme, str): 545 scheme = scheme.encode("ascii", "strict") 546 if isinstance(authority, str): 547 authority = authority.encode("ascii", "strict") 548 if isinstance(path, str): 549 path = path.encode("ascii", "strict") 550 if isinstance(http_version, str): 551 http_version = http_version.encode("ascii", "strict") 552 553 if isinstance(content, str): 554 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 555 if not isinstance(headers, Headers): 556 headers = Headers(headers) 557 if trailers is not None and not isinstance(trailers, Headers): 558 trailers = Headers(trailers) 559 560 self.data = RequestData( 561 host=host, 562 port=port, 563 method=method, 564 scheme=scheme, 565 authority=authority, 566 path=path, 567 http_version=http_version, 568 headers=headers, 569 content=content, 570 trailers=trailers, 571 timestamp_start=timestamp_start, 572 timestamp_end=timestamp_end, 573 ) 574 575 def __repr__(self) -> str: 576 if self.host and self.port: 577 hostport = f"{self.host}:{self.port}" 578 else: 579 hostport = "" 580 path = self.path or "" 581 return f"Request({self.method} {hostport}{path})" 582 583 @classmethod 584 def make( 585 cls, 586 method: str, 587 url: str, 588 content: bytes | str = "", 589 headers: ( 590 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 591 ) = (), 592 ) -> "Request": 593 """ 594 Simplified API for creating request objects. 595 """ 596 # Headers can be list or dict, we differentiate here. 597 if isinstance(headers, Headers): 598 pass 599 elif isinstance(headers, dict): 600 headers = Headers( 601 ( 602 always_bytes(k, "utf-8", "surrogateescape"), 603 always_bytes(v, "utf-8", "surrogateescape"), 604 ) 605 for k, v in headers.items() 606 ) 607 elif isinstance(headers, Iterable): 608 headers = Headers(headers) # type: ignore 609 else: 610 raise TypeError( 611 "Expected headers to be an iterable or dict, but is {}.".format( 612 type(headers).__name__ 613 ) 614 ) 615 616 req = cls( 617 "", 618 0, 619 method.encode("utf-8", "surrogateescape"), 620 b"", 621 b"", 622 b"", 623 b"HTTP/1.1", 624 headers, 625 b"", 626 None, 627 time.time(), 628 time.time(), 629 ) 630 631 req.url = url 632 # Assign this manually to update the content-length header. 633 if isinstance(content, bytes): 634 req.content = content 635 elif isinstance(content, str): 636 req.text = content 637 else: 638 raise TypeError( 639 f"Expected content to be str or bytes, but is {type(content).__name__}." 640 ) 641 642 return req 643 644 @property 645 def first_line_format(self) -> str: 646 """ 647 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 648 649 origin-form and asterisk-form are subsumed as "relative". 650 """ 651 if self.method == "CONNECT": 652 return "authority" 653 elif self.authority: 654 return "absolute" 655 else: 656 return "relative" 657 658 @property 659 def method(self) -> str: 660 """ 661 HTTP request method, e.g. "GET". 662 """ 663 return self.data.method.decode("utf-8", "surrogateescape").upper() 664 665 @method.setter 666 def method(self, val: str | bytes) -> None: 667 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 668 669 @property 670 def scheme(self) -> str: 671 """ 672 HTTP request scheme, which should be "http" or "https". 673 """ 674 return self.data.scheme.decode("utf-8", "surrogateescape") 675 676 @scheme.setter 677 def scheme(self, val: str | bytes) -> None: 678 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 679 680 @property 681 def authority(self) -> str: 682 """ 683 HTTP request authority. 684 685 For HTTP/1, this is the authority portion of the request target 686 (in either absolute-form or authority-form). 687 For origin-form and asterisk-form requests, this property is set to an empty string. 688 689 For HTTP/2, this is the :authority pseudo header. 690 691 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 692 """ 693 try: 694 return self.data.authority.decode("idna") 695 except UnicodeError: 696 return self.data.authority.decode("utf8", "surrogateescape") 697 698 @authority.setter 699 def authority(self, val: str | bytes) -> None: 700 if isinstance(val, str): 701 try: 702 val = val.encode("idna", "strict") 703 except UnicodeError: 704 val = val.encode("utf8", "surrogateescape") # type: ignore 705 self.data.authority = val 706 707 @property 708 def host(self) -> str: 709 """ 710 Target server for this request. This may be parsed from the raw request 711 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 712 or inferred from the proxy mode (e.g. an IP in transparent mode). 713 714 Setting the host attribute also updates the host header and authority information, if present. 715 716 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 717 """ 718 return self.data.host 719 720 @host.setter 721 def host(self, val: str | bytes) -> None: 722 self.data.host = always_str(val, "idna", "strict") 723 self._update_host_and_authority() 724 725 @property 726 def host_header(self) -> str | None: 727 """ 728 The request's host/authority header. 729 730 This property maps to either ``request.headers["Host"]`` or 731 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 732 733 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 734 """ 735 if self.is_http2 or self.is_http3: 736 return self.authority or self.data.headers.get("Host", None) 737 else: 738 return self.data.headers.get("Host", None) 739 740 @host_header.setter 741 def host_header(self, val: None | str | bytes) -> None: 742 if val is None: 743 if self.is_http2 or self.is_http3: 744 self.data.authority = b"" 745 self.headers.pop("Host", None) 746 else: 747 if self.is_http2 or self.is_http3: 748 self.authority = val # type: ignore 749 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 750 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 751 self.headers["Host"] = val 752 753 @property 754 def port(self) -> int: 755 """ 756 Target port. 757 """ 758 return self.data.port 759 760 @port.setter 761 def port(self, port: int) -> None: 762 if not isinstance(port, int): 763 raise ValueError(f"Port must be an integer, not {port!r}.") 764 765 self.data.port = port 766 self._update_host_and_authority() 767 768 def _update_host_and_authority(self) -> None: 769 val = url.hostport(self.scheme, self.host, self.port) 770 771 # Update host header 772 if "Host" in self.data.headers: 773 self.data.headers["Host"] = val 774 # Update authority 775 if self.data.authority: 776 self.authority = val 777 778 @property 779 def path(self) -> str: 780 """ 781 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 782 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 783 784 This attribute includes both path and query parts of the target URI 785 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 786 """ 787 return self.data.path.decode("utf-8", "surrogateescape") 788 789 @path.setter 790 def path(self, val: str | bytes) -> None: 791 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 792 793 @property 794 def url(self) -> str: 795 """ 796 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 797 798 Settings this property updates these attributes as well. 799 """ 800 if self.first_line_format == "authority": 801 return f"{self.host}:{self.port}" 802 path = self.path if self.path != "*" else "" 803 return url.unparse(self.scheme, self.host, self.port, path) 804 805 @url.setter 806 def url(self, val: str | bytes) -> None: 807 val = always_str(val, "utf-8", "surrogateescape") 808 self.scheme, self.host, self.port, self.path = url.parse(val) # type: ignore 809 810 @property 811 def pretty_host(self) -> str: 812 """ 813 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 814 This is useful in transparent mode where `Request.host` is only an IP address. 815 816 *Warning:* When working in adversarial environments, this may not reflect the actual destination 817 as the Host header could be spoofed. 818 """ 819 authority = self.host_header 820 if authority: 821 return url.parse_authority(authority, check=False)[0] 822 else: 823 return self.host 824 825 @property 826 def pretty_url(self) -> str: 827 """ 828 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 829 """ 830 if self.first_line_format == "authority": 831 return self.authority 832 833 host_header = self.host_header 834 if not host_header: 835 return self.url 836 837 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 838 pretty_port = pretty_port or url.default_port(self.scheme) or 443 839 path = self.path if self.path != "*" else "" 840 841 return url.unparse(self.scheme, pretty_host, pretty_port, path) 842 843 def _get_query(self): 844 query = urllib.parse.urlparse(self.url).query 845 return tuple(url.decode(query)) 846 847 def _set_query(self, query_data): 848 query = url.encode(query_data) 849 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 850 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 851 852 @property 853 def query(self) -> multidict.MultiDictView[str, str]: 854 """ 855 The request query as a mutable mapping view on the request's path. 856 For the most part, this behaves like a dictionary. 857 Modifications to the MultiDictView update `Request.path`, and vice versa. 858 """ 859 return multidict.MultiDictView(self._get_query, self._set_query) 860 861 @query.setter 862 def query(self, value): 863 self._set_query(value) 864 865 def _get_cookies(self): 866 h = self.headers.get_all("Cookie") 867 return tuple(cookies.parse_cookie_headers(h)) 868 869 def _set_cookies(self, value): 870 self.headers["cookie"] = cookies.format_cookie_header(value) 871 872 @property 873 def cookies(self) -> multidict.MultiDictView[str, str]: 874 """ 875 The request cookies. 876 For the most part, this behaves like a dictionary. 877 Modifications to the MultiDictView update `Request.headers`, and vice versa. 878 """ 879 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 880 881 @cookies.setter 882 def cookies(self, value): 883 self._set_cookies(value) 884 885 @property 886 def path_components(self) -> tuple[str, ...]: 887 """ 888 The URL's path components as a tuple of strings. 889 Components are unquoted. 890 """ 891 path = urllib.parse.urlparse(self.url).path 892 # This needs to be a tuple so that it's immutable. 893 # Otherwise, this would fail silently: 894 # request.path_components.append("foo") 895 return tuple(url.unquote(i) for i in path.split("/") if i) 896 897 @path_components.setter 898 def path_components(self, components: Iterable[str]): 899 components = map(lambda x: url.quote(x, safe=""), components) 900 path = "/" + "/".join(components) 901 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 902 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 903 904 def anticache(self) -> None: 905 """ 906 Modifies this request to remove headers that might produce a cached response. 907 """ 908 delheaders = ( 909 "if-modified-since", 910 "if-none-match", 911 ) 912 for i in delheaders: 913 self.headers.pop(i, None) 914 915 def anticomp(self) -> None: 916 """ 917 Modify the Accept-Encoding header to only accept uncompressed responses. 918 """ 919 self.headers["accept-encoding"] = "identity" 920 921 def constrain_encoding(self) -> None: 922 """ 923 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 924 """ 925 accept_encoding = self.headers.get("accept-encoding") 926 if accept_encoding: 927 self.headers["accept-encoding"] = ", ".join( 928 e 929 for e in {"gzip", "identity", "deflate", "br", "zstd"} 930 if e in accept_encoding 931 ) 932 933 def _get_urlencoded_form(self): 934 is_valid_content_type = ( 935 "application/x-www-form-urlencoded" 936 in self.headers.get("content-type", "").lower() 937 ) 938 if is_valid_content_type: 939 return tuple(url.decode(self.get_text(strict=False))) 940 return () 941 942 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 943 """ 944 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 945 This will overwrite the existing content if there is one. 946 """ 947 self.headers["content-type"] = "application/x-www-form-urlencoded" 948 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 949 950 @property 951 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 952 """ 953 The URL-encoded form data. 954 955 If the content-type indicates non-form data or the form could not be parsed, this is set to 956 an empty `MultiDictView`. 957 958 Modifications to the MultiDictView update `Request.content`, and vice versa. 959 """ 960 return multidict.MultiDictView( 961 self._get_urlencoded_form, self._set_urlencoded_form 962 ) 963 964 @urlencoded_form.setter 965 def urlencoded_form(self, value): 966 self._set_urlencoded_form(value) 967 968 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 969 is_valid_content_type = ( 970 "multipart/form-data" in self.headers.get("content-type", "").lower() 971 ) 972 if is_valid_content_type and self.content is not None: 973 try: 974 return multipart.decode_multipart( 975 self.headers.get("content-type"), self.content 976 ) 977 except ValueError: 978 pass 979 return [] 980 981 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 982 ct = self.headers.get("content-type", "") 983 is_valid_content_type = ct.lower().startswith("multipart/form-data") 984 if not is_valid_content_type: 985 """ 986 Generate a random boundary here. 987 988 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 989 on generating the boundary. 990 """ 991 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 992 self.headers["content-type"] = ct = ( 993 f"multipart/form-data; boundary={boundary}" 994 ) 995 self.content = multipart.encode_multipart(ct, value) 996 997 @property 998 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 999 """ 1000 The multipart form data. 1001 1002 If the content-type indicates non-form data or the form could not be parsed, this is set to 1003 an empty `MultiDictView`. 1004 1005 Modifications to the MultiDictView update `Request.content`, and vice versa. 1006 """ 1007 return multidict.MultiDictView( 1008 self._get_multipart_form, self._set_multipart_form 1009 ) 1010 1011 @multipart_form.setter 1012 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1013 self._set_multipart_form(value)
An HTTP request.
524 def __init__( 525 self, 526 host: str, 527 port: int, 528 method: bytes, 529 scheme: bytes, 530 authority: bytes, 531 path: bytes, 532 http_version: bytes, 533 headers: Headers | tuple[tuple[bytes, bytes], ...], 534 content: bytes | None, 535 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 536 timestamp_start: float, 537 timestamp_end: float | None, 538 ): 539 # auto-convert invalid types to retain compatibility with older code. 540 if isinstance(host, bytes): 541 host = host.decode("idna", "strict") 542 if isinstance(method, str): 543 method = method.encode("ascii", "strict") 544 if isinstance(scheme, str): 545 scheme = scheme.encode("ascii", "strict") 546 if isinstance(authority, str): 547 authority = authority.encode("ascii", "strict") 548 if isinstance(path, str): 549 path = path.encode("ascii", "strict") 550 if isinstance(http_version, str): 551 http_version = http_version.encode("ascii", "strict") 552 553 if isinstance(content, str): 554 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 555 if not isinstance(headers, Headers): 556 headers = Headers(headers) 557 if trailers is not None and not isinstance(trailers, Headers): 558 trailers = Headers(trailers) 559 560 self.data = RequestData( 561 host=host, 562 port=port, 563 method=method, 564 scheme=scheme, 565 authority=authority, 566 path=path, 567 http_version=http_version, 568 headers=headers, 569 content=content, 570 trailers=trailers, 571 timestamp_start=timestamp_start, 572 timestamp_end=timestamp_end, 573 )
583 @classmethod 584 def make( 585 cls, 586 method: str, 587 url: str, 588 content: bytes | str = "", 589 headers: ( 590 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 591 ) = (), 592 ) -> "Request": 593 """ 594 Simplified API for creating request objects. 595 """ 596 # Headers can be list or dict, we differentiate here. 597 if isinstance(headers, Headers): 598 pass 599 elif isinstance(headers, dict): 600 headers = Headers( 601 ( 602 always_bytes(k, "utf-8", "surrogateescape"), 603 always_bytes(v, "utf-8", "surrogateescape"), 604 ) 605 for k, v in headers.items() 606 ) 607 elif isinstance(headers, Iterable): 608 headers = Headers(headers) # type: ignore 609 else: 610 raise TypeError( 611 "Expected headers to be an iterable or dict, but is {}.".format( 612 type(headers).__name__ 613 ) 614 ) 615 616 req = cls( 617 "", 618 0, 619 method.encode("utf-8", "surrogateescape"), 620 b"", 621 b"", 622 b"", 623 b"HTTP/1.1", 624 headers, 625 b"", 626 None, 627 time.time(), 628 time.time(), 629 ) 630 631 req.url = url 632 # Assign this manually to update the content-length header. 633 if isinstance(content, bytes): 634 req.content = content 635 elif isinstance(content, str): 636 req.text = content 637 else: 638 raise TypeError( 639 f"Expected content to be str or bytes, but is {type(content).__name__}." 640 ) 641 642 return req
Simplified API for creating request objects.
644 @property 645 def first_line_format(self) -> str: 646 """ 647 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 648 649 origin-form and asterisk-form are subsumed as "relative". 650 """ 651 if self.method == "CONNECT": 652 return "authority" 653 elif self.authority: 654 return "absolute" 655 else: 656 return "relative"
Read-only: HTTP request form as defined in RFC 7230.
origin-form and asterisk-form are subsumed as "relative".
658 @property 659 def method(self) -> str: 660 """ 661 HTTP request method, e.g. "GET". 662 """ 663 return self.data.method.decode("utf-8", "surrogateescape").upper()
HTTP request method, e.g. "GET".
669 @property 670 def scheme(self) -> str: 671 """ 672 HTTP request scheme, which should be "http" or "https". 673 """ 674 return self.data.scheme.decode("utf-8", "surrogateescape")
HTTP request scheme, which should be "http" or "https".
707 @property 708 def host(self) -> str: 709 """ 710 Target server for this request. This may be parsed from the raw request 711 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 712 or inferred from the proxy mode (e.g. an IP in transparent mode). 713 714 Setting the host attribute also updates the host header and authority information, if present. 715 716 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 717 """ 718 return self.data.host
Target server for this request. This may be parsed from the raw request
(e.g. from a GET http://example.com/ HTTP/1.1
request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header and authority information, if present.
See also: Request.authority
, Request.host_header
, Request.pretty_host
725 @property 726 def host_header(self) -> str | None: 727 """ 728 The request's host/authority header. 729 730 This property maps to either ``request.headers["Host"]`` or 731 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 732 733 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 734 """ 735 if self.is_http2 or self.is_http3: 736 return self.authority or self.data.headers.get("Host", None) 737 else: 738 return self.data.headers.get("Host", None)
The request's host/authority header.
This property maps to either request.headers["Host"]
or
request.authority
, depending on whether it's HTTP/1.x or HTTP/2.0.
See also: Request.authority
,Request.host
, Request.pretty_host
778 @property 779 def path(self) -> str: 780 """ 781 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 782 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 783 784 This attribute includes both path and query parts of the target URI 785 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 786 """ 787 return self.data.path.decode("utf-8", "surrogateescape")
HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".
This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).
793 @property 794 def url(self) -> str: 795 """ 796 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 797 798 Settings this property updates these attributes as well. 799 """ 800 if self.first_line_format == "authority": 801 return f"{self.host}:{self.port}" 802 path = self.path if self.path != "*" else "" 803 return url.unparse(self.scheme, self.host, self.port, path)
The full URL string, constructed from Request.scheme
, Request.host
, Request.port
and Request.path
.
Settings this property updates these attributes as well.
810 @property 811 def pretty_host(self) -> str: 812 """ 813 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 814 This is useful in transparent mode where `Request.host` is only an IP address. 815 816 *Warning:* When working in adversarial environments, this may not reflect the actual destination 817 as the Host header could be spoofed. 818 """ 819 authority = self.host_header 820 if authority: 821 return url.parse_authority(authority, check=False)[0] 822 else: 823 return self.host
Read-only: Like Request.host
, but using Request.host_header
header as an additional (preferred) data source.
This is useful in transparent mode where Request.host
is only an IP address.
Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.
825 @property 826 def pretty_url(self) -> str: 827 """ 828 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 829 """ 830 if self.first_line_format == "authority": 831 return self.authority 832 833 host_header = self.host_header 834 if not host_header: 835 return self.url 836 837 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 838 pretty_port = pretty_port or url.default_port(self.scheme) or 443 839 path = self.path if self.path != "*" else "" 840 841 return url.unparse(self.scheme, pretty_host, pretty_port, path)
Read-only: Like Request.url
, but using Request.pretty_host
instead of Request.host
.
852 @property 853 def query(self) -> multidict.MultiDictView[str, str]: 854 """ 855 The request query as a mutable mapping view on the request's path. 856 For the most part, this behaves like a dictionary. 857 Modifications to the MultiDictView update `Request.path`, and vice versa. 858 """ 859 return multidict.MultiDictView(self._get_query, self._set_query)
The request query as a mutable mapping view on the request's path.
For the most part, this behaves like a dictionary.
Modifications to the MultiDictView update Request.path
, and vice versa.
885 @property 886 def path_components(self) -> tuple[str, ...]: 887 """ 888 The URL's path components as a tuple of strings. 889 Components are unquoted. 890 """ 891 path = urllib.parse.urlparse(self.url).path 892 # This needs to be a tuple so that it's immutable. 893 # Otherwise, this would fail silently: 894 # request.path_components.append("foo") 895 return tuple(url.unquote(i) for i in path.split("/") if i)
The URL's path components as a tuple of strings. Components are unquoted.
904 def anticache(self) -> None: 905 """ 906 Modifies this request to remove headers that might produce a cached response. 907 """ 908 delheaders = ( 909 "if-modified-since", 910 "if-none-match", 911 ) 912 for i in delheaders: 913 self.headers.pop(i, None)
Modifies this request to remove headers that might produce a cached response.
915 def anticomp(self) -> None: 916 """ 917 Modify the Accept-Encoding header to only accept uncompressed responses. 918 """ 919 self.headers["accept-encoding"] = "identity"
Modify the Accept-Encoding header to only accept uncompressed responses.
921 def constrain_encoding(self) -> None: 922 """ 923 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 924 """ 925 accept_encoding = self.headers.get("accept-encoding") 926 if accept_encoding: 927 self.headers["accept-encoding"] = ", ".join( 928 e 929 for e in {"gzip", "identity", "deflate", "br", "zstd"} 930 if e in accept_encoding 931 )
Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
950 @property 951 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 952 """ 953 The URL-encoded form data. 954 955 If the content-type indicates non-form data or the form could not be parsed, this is set to 956 an empty `MultiDictView`. 957 958 Modifications to the MultiDictView update `Request.content`, and vice versa. 959 """ 960 return multidict.MultiDictView( 961 self._get_urlencoded_form, self._set_urlencoded_form 962 )
The URL-encoded form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
997 @property 998 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 999 """ 1000 The multipart form data. 1001 1002 If the content-type indicates non-form data or the form could not be parsed, this is set to 1003 an empty `MultiDictView`. 1004 1005 Modifications to the MultiDictView update `Request.content`, and vice versa. 1006 """ 1007 return multidict.MultiDictView( 1008 self._get_multipart_form, self._set_multipart_form 1009 )
The multipart form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
1016class Response(Message): 1017 """ 1018 An HTTP response. 1019 """ 1020 1021 data: ResponseData 1022 1023 def __init__( 1024 self, 1025 http_version: bytes, 1026 status_code: int, 1027 reason: bytes, 1028 headers: Headers | tuple[tuple[bytes, bytes], ...], 1029 content: bytes | None, 1030 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1031 timestamp_start: float, 1032 timestamp_end: float | None, 1033 ): 1034 # auto-convert invalid types to retain compatibility with older code. 1035 if isinstance(http_version, str): 1036 http_version = http_version.encode("ascii", "strict") 1037 if isinstance(reason, str): 1038 reason = reason.encode("ascii", "strict") 1039 1040 if isinstance(content, str): 1041 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1042 if not isinstance(headers, Headers): 1043 headers = Headers(headers) 1044 if trailers is not None and not isinstance(trailers, Headers): 1045 trailers = Headers(trailers) 1046 1047 self.data = ResponseData( 1048 http_version=http_version, 1049 status_code=status_code, 1050 reason=reason, 1051 headers=headers, 1052 content=content, 1053 trailers=trailers, 1054 timestamp_start=timestamp_start, 1055 timestamp_end=timestamp_end, 1056 ) 1057 1058 def __repr__(self) -> str: 1059 if self.raw_content: 1060 ct = self.headers.get("content-type", "unknown content type") 1061 size = human.pretty_size(len(self.raw_content)) 1062 details = f"{ct}, {size}" 1063 else: 1064 details = "no content" 1065 return f"Response({self.status_code}, {details})" 1066 1067 @classmethod 1068 def make( 1069 cls, 1070 status_code: int = 200, 1071 content: bytes | str = b"", 1072 headers: ( 1073 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1074 ) = (), 1075 ) -> "Response": 1076 """ 1077 Simplified API for creating response objects. 1078 """ 1079 if isinstance(headers, Headers): 1080 headers = headers 1081 elif isinstance(headers, dict): 1082 headers = Headers( 1083 ( 1084 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1085 always_bytes(v, "utf-8", "surrogateescape"), 1086 ) 1087 for k, v in headers.items() 1088 ) 1089 elif isinstance(headers, Iterable): 1090 headers = Headers(headers) # type: ignore 1091 else: 1092 raise TypeError( 1093 "Expected headers to be an iterable or dict, but is {}.".format( 1094 type(headers).__name__ 1095 ) 1096 ) 1097 1098 resp = cls( 1099 b"HTTP/1.1", 1100 status_code, 1101 status_codes.RESPONSES.get(status_code, "").encode(), 1102 headers, 1103 None, 1104 None, 1105 time.time(), 1106 time.time(), 1107 ) 1108 1109 # Assign this manually to update the content-length header. 1110 if isinstance(content, bytes): 1111 resp.content = content 1112 elif isinstance(content, str): 1113 resp.text = content 1114 else: 1115 raise TypeError( 1116 f"Expected content to be str or bytes, but is {type(content).__name__}." 1117 ) 1118 1119 return resp 1120 1121 @property 1122 def status_code(self) -> int: 1123 """ 1124 HTTP Status Code, e.g. ``200``. 1125 """ 1126 return self.data.status_code 1127 1128 @status_code.setter 1129 def status_code(self, status_code: int) -> None: 1130 self.data.status_code = status_code 1131 1132 @property 1133 def reason(self) -> str: 1134 """ 1135 HTTP reason phrase, for example "Not Found". 1136 1137 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1138 """ 1139 # Encoding: http://stackoverflow.com/a/16674906/934719 1140 return self.data.reason.decode("ISO-8859-1") 1141 1142 @reason.setter 1143 def reason(self, reason: str | bytes) -> None: 1144 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1145 1146 def _get_cookies(self): 1147 h = self.headers.get_all("set-cookie") 1148 all_cookies = cookies.parse_set_cookie_headers(h) 1149 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1150 1151 def _set_cookies(self, value): 1152 cookie_headers = [] 1153 for k, v in value: 1154 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1155 cookie_headers.append(header) 1156 self.headers.set_all("set-cookie", cookie_headers) 1157 1158 @property 1159 def cookies( 1160 self, 1161 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1162 """ 1163 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1164 name strings, and values are `(cookie value, attributes)` tuples. Within 1165 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1166 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1167 1168 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1169 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1170 """ 1171 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1172 1173 @cookies.setter 1174 def cookies(self, value): 1175 self._set_cookies(value) 1176 1177 def refresh(self, now=None): 1178 """ 1179 This fairly complex and heuristic function refreshes a server 1180 response for replay. 1181 1182 - It adjusts date, expires, and last-modified headers. 1183 - It adjusts cookie expiration. 1184 """ 1185 if not now: 1186 now = time.time() 1187 delta = now - self.timestamp_start 1188 refresh_headers = [ 1189 "date", 1190 "expires", 1191 "last-modified", 1192 ] 1193 for i in refresh_headers: 1194 if i in self.headers: 1195 d = parsedate_tz(self.headers[i]) 1196 if d: 1197 new = mktime_tz(d) + delta 1198 try: 1199 self.headers[i] = formatdate(new, usegmt=True) 1200 except OSError: # pragma: no cover 1201 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1202 c = [] 1203 for set_cookie_header in self.headers.get_all("set-cookie"): 1204 try: 1205 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1206 except ValueError: 1207 refreshed = set_cookie_header 1208 c.append(refreshed) 1209 if c: 1210 self.headers.set_all("set-cookie", c)
An HTTP response.
1023 def __init__( 1024 self, 1025 http_version: bytes, 1026 status_code: int, 1027 reason: bytes, 1028 headers: Headers | tuple[tuple[bytes, bytes], ...], 1029 content: bytes | None, 1030 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1031 timestamp_start: float, 1032 timestamp_end: float | None, 1033 ): 1034 # auto-convert invalid types to retain compatibility with older code. 1035 if isinstance(http_version, str): 1036 http_version = http_version.encode("ascii", "strict") 1037 if isinstance(reason, str): 1038 reason = reason.encode("ascii", "strict") 1039 1040 if isinstance(content, str): 1041 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1042 if not isinstance(headers, Headers): 1043 headers = Headers(headers) 1044 if trailers is not None and not isinstance(trailers, Headers): 1045 trailers = Headers(trailers) 1046 1047 self.data = ResponseData( 1048 http_version=http_version, 1049 status_code=status_code, 1050 reason=reason, 1051 headers=headers, 1052 content=content, 1053 trailers=trailers, 1054 timestamp_start=timestamp_start, 1055 timestamp_end=timestamp_end, 1056 )
1067 @classmethod 1068 def make( 1069 cls, 1070 status_code: int = 200, 1071 content: bytes | str = b"", 1072 headers: ( 1073 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1074 ) = (), 1075 ) -> "Response": 1076 """ 1077 Simplified API for creating response objects. 1078 """ 1079 if isinstance(headers, Headers): 1080 headers = headers 1081 elif isinstance(headers, dict): 1082 headers = Headers( 1083 ( 1084 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1085 always_bytes(v, "utf-8", "surrogateescape"), 1086 ) 1087 for k, v in headers.items() 1088 ) 1089 elif isinstance(headers, Iterable): 1090 headers = Headers(headers) # type: ignore 1091 else: 1092 raise TypeError( 1093 "Expected headers to be an iterable or dict, but is {}.".format( 1094 type(headers).__name__ 1095 ) 1096 ) 1097 1098 resp = cls( 1099 b"HTTP/1.1", 1100 status_code, 1101 status_codes.RESPONSES.get(status_code, "").encode(), 1102 headers, 1103 None, 1104 None, 1105 time.time(), 1106 time.time(), 1107 ) 1108 1109 # Assign this manually to update the content-length header. 1110 if isinstance(content, bytes): 1111 resp.content = content 1112 elif isinstance(content, str): 1113 resp.text = content 1114 else: 1115 raise TypeError( 1116 f"Expected content to be str or bytes, but is {type(content).__name__}." 1117 ) 1118 1119 return resp
Simplified API for creating response objects.
1121 @property 1122 def status_code(self) -> int: 1123 """ 1124 HTTP Status Code, e.g. ``200``. 1125 """ 1126 return self.data.status_code
HTTP Status Code, e.g. 200
.
1132 @property 1133 def reason(self) -> str: 1134 """ 1135 HTTP reason phrase, for example "Not Found". 1136 1137 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1138 """ 1139 # Encoding: http://stackoverflow.com/a/16674906/934719 1140 return self.data.reason.decode("ISO-8859-1")
HTTP reason phrase, for example "Not Found".
HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1177 def refresh(self, now=None): 1178 """ 1179 This fairly complex and heuristic function refreshes a server 1180 response for replay. 1181 1182 - It adjusts date, expires, and last-modified headers. 1183 - It adjusts cookie expiration. 1184 """ 1185 if not now: 1186 now = time.time() 1187 delta = now - self.timestamp_start 1188 refresh_headers = [ 1189 "date", 1190 "expires", 1191 "last-modified", 1192 ] 1193 for i in refresh_headers: 1194 if i in self.headers: 1195 d = parsedate_tz(self.headers[i]) 1196 if d: 1197 new = mktime_tz(d) + delta 1198 try: 1199 self.headers[i] = formatdate(new, usegmt=True) 1200 except OSError: # pragma: no cover 1201 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1202 c = [] 1203 for set_cookie_header in self.headers.get_all("set-cookie"): 1204 try: 1205 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1206 except ValueError: 1207 refreshed = set_cookie_header 1208 c.append(refreshed) 1209 if c: 1210 self.headers.set_all("set-cookie", c)
This fairly complex and heuristic function refreshes a server response for replay.
- It adjusts date, expires, and last-modified headers.
- It adjusts cookie expiration.
50class Headers(multidict.MultiDict): # type: ignore 51 """ 52 Header class which allows both convenient access to individual headers as well as 53 direct access to the underlying raw data. Provides a full dictionary interface. 54 55 Create headers with keyword arguments: 56 >>> h = Headers(host="example.com", content_type="application/xml") 57 58 Headers mostly behave like a normal dict: 59 >>> h["Host"] 60 "example.com" 61 62 Headers are case insensitive: 63 >>> h["host"] 64 "example.com" 65 66 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 67 >>> h = Headers([ 68 (b"Host",b"example.com"), 69 (b"Accept",b"text/html"), 70 (b"accept",b"application/xml") 71 ]) 72 73 Multiple headers are folded into a single header as per RFC 7230: 74 >>> h["Accept"] 75 "text/html, application/xml" 76 77 Setting a header removes all existing headers with the same name: 78 >>> h["Accept"] = "application/text" 79 >>> h["Accept"] 80 "application/text" 81 82 `bytes(h)` returns an HTTP/1 header block: 83 >>> print(bytes(h)) 84 Host: example.com 85 Accept: application/text 86 87 For full control, the raw header fields can be accessed: 88 >>> h.fields 89 90 Caveats: 91 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 92 """ 93 94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 ) 119 120 fields: tuple[tuple[bytes, bytes], ...] 121 122 @staticmethod 123 def _reduce_values(values) -> str: 124 # Headers can be folded 125 return ", ".join(values) 126 127 @staticmethod 128 def _kconv(key) -> str: 129 # Headers are case-insensitive 130 return key.lower() 131 132 def __bytes__(self) -> bytes: 133 if self.fields: 134 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 135 else: 136 return b"" 137 138 def __delitem__(self, key: str | bytes) -> None: 139 key = _always_bytes(key) 140 super().__delitem__(key) 141 142 def __iter__(self) -> Iterator[str]: 143 for x in super().__iter__(): 144 yield _native(x) 145 146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)] 158 159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values) 167 168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value) 172 173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.
Create headers with keyword arguments:
>>> h = Headers(host="example.com", content_type="application/xml")
Headers mostly behave like a normal dict:
>>> h["Host"]
"example.com"
Headers are case insensitive:
>>> h["host"]
"example.com"
Headers can also be created from a list of raw (header_name, header_value) byte tuples:
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
Multiple headers are folded into a single header as per RFC 7230:
>>> h["Accept"]
"text/html, application/xml"
Setting a header removes all existing headers with the same name:
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
bytes(h)
returns an HTTP/1 header block:
>>> print(bytes(h))
Host: example.com
Accept: application/text
For full control, the raw header fields can be accessed:
>>> h.fields
Caveats:
- For use with the "Set-Cookie" and "Cookie" headers, either use
Response.cookies
or seeHeaders.get_all
.
94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 )
Args:
- fields: (optional) list of
(name, value)
header byte tuples, e.g.[(b"Host", b"example.com")]
. All names and values must be bytes. - **headers: Additional headers to set. Will overwrite existing values from
fields
. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.
If **headers
contains multiple keys that have equal .lower()
representations,
the behavior is undefined.
146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)]
Like Headers.get
, but does not fold multiple headers into a single one.
This is useful for Set-Cookie and Cookie headers, which do not support folding.
See also:
159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values)
Explicitly set multiple headers for the given key.
See Headers.get_all
.
168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value)
Insert an additional value for the given key at the specified position.
173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Get all (key, value) tuples.
If multi
is True, all (key, value)
pairs will be returned.
If False, only one tuple per key is returned.