mitmproxy.http
1import binascii 2import json 3import os 4import time 5import urllib.parse 6import warnings 7from collections.abc import Callable 8from collections.abc import Iterable 9from collections.abc import Iterator 10from collections.abc import Mapping 11from collections.abc import Sequence 12from dataclasses import dataclass 13from dataclasses import fields 14from email.utils import formatdate 15from email.utils import mktime_tz 16from email.utils import parsedate_tz 17from typing import Any 18from typing import cast 19 20from mitmproxy import flow 21from mitmproxy.coretypes import multidict 22from mitmproxy.coretypes import serializable 23from mitmproxy.net import encoding 24from mitmproxy.net.http import cookies 25from mitmproxy.net.http import multipart 26from mitmproxy.net.http import status_codes 27from mitmproxy.net.http import url 28from mitmproxy.net.http.headers import assemble_content_type 29from mitmproxy.net.http.headers import infer_content_encoding 30from mitmproxy.net.http.headers import parse_content_type 31from mitmproxy.utils import human 32from mitmproxy.utils import strutils 33from mitmproxy.utils import typecheck 34from mitmproxy.utils.strutils import always_bytes 35from mitmproxy.utils.strutils import always_str 36from mitmproxy.websocket import WebSocketData 37 38 39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. 40def _native(x: bytes) -> str: 41 return x.decode("utf-8", "surrogateescape") 42 43 44def _always_bytes(x: str | bytes) -> bytes: 45 return strutils.always_bytes(x, "utf-8", "surrogateescape") 46 47 48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types. 49class Headers(multidict.MultiDict): # type: ignore 50 """ 51 Header class which allows both convenient access to individual headers as well as 52 direct access to the underlying raw data. Provides a full dictionary interface. 53 54 Create headers with keyword arguments: 55 >>> h = Headers(host="example.com", content_type="application/xml") 56 57 Headers mostly behave like a normal dict: 58 >>> h["Host"] 59 "example.com" 60 61 Headers are case insensitive: 62 >>> h["host"] 63 "example.com" 64 65 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 66 >>> h = Headers([ 67 (b"Host",b"example.com"), 68 (b"Accept",b"text/html"), 69 (b"accept",b"application/xml") 70 ]) 71 72 Multiple headers are folded into a single header as per RFC 7230: 73 >>> h["Accept"] 74 "text/html, application/xml" 75 76 Setting a header removes all existing headers with the same name: 77 >>> h["Accept"] = "application/text" 78 >>> h["Accept"] 79 "application/text" 80 81 `bytes(h)` returns an HTTP/1 header block: 82 >>> print(bytes(h)) 83 Host: example.com 84 Accept: application/text 85 86 For full control, the raw header fields can be accessed: 87 >>> h.fields 88 89 Caveats: 90 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 91 """ 92 93 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 94 """ 95 *Args:* 96 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 97 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 98 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 99 For convenience, underscores in header names will be transformed to dashes - 100 this behaviour does not extend to other methods. 101 102 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 103 the behavior is undefined. 104 """ 105 super().__init__(fields) 106 107 for key, value in self.fields: 108 if not isinstance(key, bytes) or not isinstance(value, bytes): 109 raise TypeError("Header fields must be bytes.") 110 111 # content_type -> content-type 112 self.update( 113 { 114 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 115 for name, value in headers.items() 116 } 117 ) 118 119 fields: tuple[tuple[bytes, bytes], ...] 120 121 @staticmethod 122 def _reduce_values(values) -> str: 123 # Headers can be folded 124 return ", ".join(values) 125 126 @staticmethod 127 def _kconv(key) -> str: 128 # Headers are case-insensitive 129 return key.lower() 130 131 def __bytes__(self) -> bytes: 132 if self.fields: 133 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 134 else: 135 return b"" 136 137 def __delitem__(self, key: str | bytes) -> None: 138 key = _always_bytes(key) 139 super().__delitem__(key) 140 141 def __iter__(self) -> Iterator[str]: 142 for x in super().__iter__(): 143 yield _native(x) 144 145 def get_all(self, name: str | bytes) -> list[str]: 146 """ 147 Like `Headers.get`, but does not fold multiple headers into a single one. 148 This is useful for Set-Cookie and Cookie headers, which do not support folding. 149 150 *See also:* 151 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 152 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 153 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 154 """ 155 name = _always_bytes(name) 156 return [_native(x) for x in super().get_all(name)] 157 158 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 159 """ 160 Explicitly set multiple headers for the given key. 161 See `Headers.get_all`. 162 """ 163 name = _always_bytes(name) 164 values = [_always_bytes(x) for x in values] 165 return super().set_all(name, values) 166 167 def insert(self, index: int, key: str | bytes, value: str | bytes): 168 key = _always_bytes(key) 169 value = _always_bytes(value) 170 super().insert(index, key, value) 171 172 def items(self, multi=False): 173 if multi: 174 return ((_native(k), _native(v)) for k, v in self.fields) 175 else: 176 return super().items() 177 178 179@dataclass 180class MessageData(serializable.Serializable): 181 http_version: bytes 182 headers: Headers 183 content: bytes | None 184 trailers: Headers | None 185 timestamp_start: float 186 timestamp_end: float | None 187 188 # noinspection PyUnreachableCode 189 if __debug__: 190 191 def __post_init__(self): 192 for field in fields(self): 193 val = getattr(self, field.name) 194 typecheck.check_option_type(field.name, val, field.type) 195 196 def set_state(self, state): 197 for k, v in state.items(): 198 if k in ("headers", "trailers") and v is not None: 199 v = Headers.from_state(v) 200 setattr(self, k, v) 201 202 def get_state(self): 203 state = vars(self).copy() 204 state["headers"] = state["headers"].get_state() 205 if state["trailers"] is not None: 206 state["trailers"] = state["trailers"].get_state() 207 return state 208 209 @classmethod 210 def from_state(cls, state): 211 state["headers"] = Headers.from_state(state["headers"]) 212 if state["trailers"] is not None: 213 state["trailers"] = Headers.from_state(state["trailers"]) 214 return cls(**state) 215 216 217@dataclass 218class RequestData(MessageData): 219 host: str 220 port: int 221 method: bytes 222 scheme: bytes 223 authority: bytes 224 path: bytes 225 226 227@dataclass 228class ResponseData(MessageData): 229 status_code: int 230 reason: bytes 231 232 233class Message(serializable.Serializable): 234 """Base class for `Request` and `Response`.""" 235 236 @classmethod 237 def from_state(cls, state): 238 return cls(**state) 239 240 def get_state(self): 241 return self.data.get_state() 242 243 def set_state(self, state): 244 self.data.set_state(state) 245 246 data: MessageData 247 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 248 """ 249 This attribute controls if the message body should be streamed. 250 251 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 252 This makes it possible to perform string replacements on the entire body. 253 If `True`, the message body will not be buffered on the proxy 254 but immediately forwarded instead. 255 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 256 Please note that packet boundaries generally should not be relied upon. 257 258 This attribute must be set in the `requestheaders` or `responseheaders` hook. 259 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 260 """ 261 262 @property 263 def http_version(self) -> str: 264 """ 265 HTTP version string, for example `HTTP/1.1`. 266 """ 267 return self.data.http_version.decode("utf-8", "surrogateescape") 268 269 @http_version.setter 270 def http_version(self, http_version: str | bytes) -> None: 271 self.data.http_version = strutils.always_bytes( 272 http_version, "utf-8", "surrogateescape" 273 ) 274 275 @property 276 def is_http10(self) -> bool: 277 return self.data.http_version == b"HTTP/1.0" 278 279 @property 280 def is_http11(self) -> bool: 281 return self.data.http_version == b"HTTP/1.1" 282 283 @property 284 def is_http2(self) -> bool: 285 return self.data.http_version == b"HTTP/2.0" 286 287 @property 288 def is_http3(self) -> bool: 289 return self.data.http_version == b"HTTP/3" 290 291 @property 292 def headers(self) -> Headers: 293 """ 294 The HTTP headers. 295 """ 296 return self.data.headers 297 298 @headers.setter 299 def headers(self, h: Headers) -> None: 300 self.data.headers = h 301 302 @property 303 def trailers(self) -> Headers | None: 304 """ 305 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 306 """ 307 return self.data.trailers 308 309 @trailers.setter 310 def trailers(self, h: Headers | None) -> None: 311 self.data.trailers = h 312 313 @property 314 def raw_content(self) -> bytes | None: 315 """ 316 The raw (potentially compressed) HTTP message body. 317 318 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 319 320 *See also:* `Message.content`, `Message.text` 321 """ 322 return self.data.content 323 324 @raw_content.setter 325 def raw_content(self, content: bytes | None) -> None: 326 self.data.content = content 327 328 @property 329 def content(self) -> bytes | None: 330 """ 331 The uncompressed HTTP message body as bytes. 332 333 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 334 335 *See also:* `Message.raw_content`, `Message.text` 336 """ 337 return self.get_content() 338 339 @content.setter 340 def content(self, value: bytes | None) -> None: 341 self.set_content(value) 342 343 @property 344 def text(self) -> str | None: 345 """ 346 The uncompressed and decoded HTTP message body as text. 347 348 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 349 350 *See also:* `Message.raw_content`, `Message.content` 351 """ 352 return self.get_text() 353 354 @text.setter 355 def text(self, value: str | None) -> None: 356 self.set_text(value) 357 358 def set_content(self, value: bytes | None) -> None: 359 if value is None: 360 self.raw_content = None 361 return 362 if not isinstance(value, bytes): 363 raise TypeError( 364 f"Message content must be bytes, not {type(value).__name__}. " 365 "Please use .text if you want to assign a str." 366 ) 367 ce = self.headers.get("content-encoding") 368 try: 369 self.raw_content = encoding.encode(value, ce or "identity") 370 except ValueError: 371 # So we have an invalid content-encoding? 372 # Let's remove it! 373 del self.headers["content-encoding"] 374 self.raw_content = value 375 376 if "transfer-encoding" in self.headers: 377 # https://httpwg.org/specs/rfc7230.html#header.content-length 378 # don't set content-length if a transfer-encoding is provided 379 pass 380 else: 381 self.headers["content-length"] = str(len(self.raw_content)) 382 383 def get_content(self, strict: bool = True) -> bytes | None: 384 """ 385 Similar to `Message.content`, but does not raise if `strict` is `False`. 386 Instead, the compressed message body is returned as-is. 387 """ 388 if self.raw_content is None: 389 return None 390 ce = self.headers.get("content-encoding") 391 if ce: 392 try: 393 content = encoding.decode(self.raw_content, ce) 394 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 395 if isinstance(content, str): 396 raise ValueError(f"Invalid Content-Encoding: {ce}") 397 return content 398 except ValueError: 399 if strict: 400 raise 401 return self.raw_content 402 else: 403 return self.raw_content 404 405 def set_text(self, text: str | None) -> None: 406 if text is None: 407 self.content = None 408 return 409 enc = infer_content_encoding(self.headers.get("content-type", "")) 410 411 try: 412 self.content = cast(bytes, encoding.encode(text, enc)) 413 except ValueError: 414 # Fall back to UTF-8 and update the content-type header. 415 ct = parse_content_type(self.headers.get("content-type", "")) or ( 416 "text", 417 "plain", 418 {}, 419 ) 420 ct[2]["charset"] = "utf-8" 421 self.headers["content-type"] = assemble_content_type(*ct) 422 enc = "utf8" 423 self.content = text.encode(enc, "surrogateescape") 424 425 def get_text(self, strict: bool = True) -> str | None: 426 """ 427 Similar to `Message.text`, but does not raise if `strict` is `False`. 428 Instead, the message body is returned as surrogate-escaped UTF-8. 429 """ 430 content = self.get_content(strict) 431 if content is None: 432 return None 433 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 434 try: 435 return cast(str, encoding.decode(content, enc)) 436 except ValueError: 437 if strict: 438 raise 439 return content.decode("utf8", "surrogateescape") 440 441 @property 442 def timestamp_start(self) -> float: 443 """ 444 *Timestamp:* Headers received. 445 """ 446 return self.data.timestamp_start 447 448 @timestamp_start.setter 449 def timestamp_start(self, timestamp_start: float) -> None: 450 self.data.timestamp_start = timestamp_start 451 452 @property 453 def timestamp_end(self) -> float | None: 454 """ 455 *Timestamp:* Last byte received. 456 """ 457 return self.data.timestamp_end 458 459 @timestamp_end.setter 460 def timestamp_end(self, timestamp_end: float | None): 461 self.data.timestamp_end = timestamp_end 462 463 def decode(self, strict: bool = True) -> None: 464 """ 465 Decodes body based on the current Content-Encoding header, then 466 removes the header. If there is no Content-Encoding header, no 467 action is taken. 468 469 *Raises:* 470 - `ValueError`, when the content-encoding is invalid and strict is True. 471 """ 472 decoded = self.get_content(strict) 473 self.headers.pop("content-encoding", None) 474 self.content = decoded 475 476 def encode(self, encoding: str) -> None: 477 """ 478 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 479 Any existing content-encodings are overwritten, the content is not decoded beforehand. 480 481 *Raises:* 482 - `ValueError`, when the specified content-encoding is invalid. 483 """ 484 self.headers["content-encoding"] = encoding 485 self.content = self.raw_content 486 if "content-encoding" not in self.headers: 487 raise ValueError(f"Invalid content encoding {repr(encoding)}") 488 489 def json(self, **kwargs: Any) -> Any: 490 """ 491 Returns the JSON encoded content of the response, if any. 492 `**kwargs` are optional arguments that will be 493 passed to `json.loads()`. 494 495 Will raise if the content can not be decoded and then parsed as JSON. 496 497 *Raises:* 498 - `json.decoder.JSONDecodeError` if content is not valid JSON. 499 - `TypeError` if the content is not available, for example because the response 500 has been streamed. 501 """ 502 content = self.get_content(strict=False) 503 if content is None: 504 raise TypeError("Message content is not available.") 505 else: 506 return json.loads(content, **kwargs) 507 508 509class Request(Message): 510 """ 511 An HTTP request. 512 """ 513 514 data: RequestData 515 516 def __init__( 517 self, 518 host: str, 519 port: int, 520 method: bytes, 521 scheme: bytes, 522 authority: bytes, 523 path: bytes, 524 http_version: bytes, 525 headers: Headers | tuple[tuple[bytes, bytes], ...], 526 content: bytes | None, 527 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 528 timestamp_start: float, 529 timestamp_end: float | None, 530 ): 531 # auto-convert invalid types to retain compatibility with older code. 532 if isinstance(host, bytes): 533 host = host.decode("idna", "strict") 534 if isinstance(method, str): 535 method = method.encode("ascii", "strict") 536 if isinstance(scheme, str): 537 scheme = scheme.encode("ascii", "strict") 538 if isinstance(authority, str): 539 authority = authority.encode("ascii", "strict") 540 if isinstance(path, str): 541 path = path.encode("ascii", "strict") 542 if isinstance(http_version, str): 543 http_version = http_version.encode("ascii", "strict") 544 545 if isinstance(content, str): 546 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 547 if not isinstance(headers, Headers): 548 headers = Headers(headers) 549 if trailers is not None and not isinstance(trailers, Headers): 550 trailers = Headers(trailers) 551 552 self.data = RequestData( 553 host=host, 554 port=port, 555 method=method, 556 scheme=scheme, 557 authority=authority, 558 path=path, 559 http_version=http_version, 560 headers=headers, 561 content=content, 562 trailers=trailers, 563 timestamp_start=timestamp_start, 564 timestamp_end=timestamp_end, 565 ) 566 567 def __repr__(self) -> str: 568 if self.host and self.port: 569 hostport = f"{self.host}:{self.port}" 570 else: 571 hostport = "" 572 path = self.path or "" 573 return f"Request({self.method} {hostport}{path})" 574 575 @classmethod 576 def make( 577 cls, 578 method: str, 579 url: str, 580 content: bytes | str = "", 581 headers: ( 582 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 583 ) = (), 584 ) -> "Request": 585 """ 586 Simplified API for creating request objects. 587 """ 588 # Headers can be list or dict, we differentiate here. 589 if isinstance(headers, Headers): 590 pass 591 elif isinstance(headers, dict): 592 headers = Headers( 593 ( 594 always_bytes(k, "utf-8", "surrogateescape"), 595 always_bytes(v, "utf-8", "surrogateescape"), 596 ) 597 for k, v in headers.items() 598 ) 599 elif isinstance(headers, Iterable): 600 headers = Headers(headers) # type: ignore 601 else: 602 raise TypeError( 603 "Expected headers to be an iterable or dict, but is {}.".format( 604 type(headers).__name__ 605 ) 606 ) 607 608 req = cls( 609 "", 610 0, 611 method.encode("utf-8", "surrogateescape"), 612 b"", 613 b"", 614 b"", 615 b"HTTP/1.1", 616 headers, 617 b"", 618 None, 619 time.time(), 620 time.time(), 621 ) 622 623 req.url = url 624 # Assign this manually to update the content-length header. 625 if isinstance(content, bytes): 626 req.content = content 627 elif isinstance(content, str): 628 req.text = content 629 else: 630 raise TypeError( 631 f"Expected content to be str or bytes, but is {type(content).__name__}." 632 ) 633 634 return req 635 636 @property 637 def first_line_format(self) -> str: 638 """ 639 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 640 641 origin-form and asterisk-form are subsumed as "relative". 642 """ 643 if self.method == "CONNECT": 644 return "authority" 645 elif self.authority: 646 return "absolute" 647 else: 648 return "relative" 649 650 @property 651 def method(self) -> str: 652 """ 653 HTTP request method, e.g. "GET". 654 """ 655 return self.data.method.decode("utf-8", "surrogateescape").upper() 656 657 @method.setter 658 def method(self, val: str | bytes) -> None: 659 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 660 661 @property 662 def scheme(self) -> str: 663 """ 664 HTTP request scheme, which should be "http" or "https". 665 """ 666 return self.data.scheme.decode("utf-8", "surrogateescape") 667 668 @scheme.setter 669 def scheme(self, val: str | bytes) -> None: 670 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 671 672 @property 673 def authority(self) -> str: 674 """ 675 HTTP request authority. 676 677 For HTTP/1, this is the authority portion of the request target 678 (in either absolute-form or authority-form). 679 For origin-form and asterisk-form requests, this property is set to an empty string. 680 681 For HTTP/2, this is the :authority pseudo header. 682 683 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 684 """ 685 try: 686 return self.data.authority.decode("idna") 687 except UnicodeError: 688 return self.data.authority.decode("utf8", "surrogateescape") 689 690 @authority.setter 691 def authority(self, val: str | bytes) -> None: 692 if isinstance(val, str): 693 try: 694 val = val.encode("idna", "strict") 695 except UnicodeError: 696 val = val.encode("utf8", "surrogateescape") # type: ignore 697 self.data.authority = val 698 699 @property 700 def host(self) -> str: 701 """ 702 Target server for this request. This may be parsed from the raw request 703 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 704 or inferred from the proxy mode (e.g. an IP in transparent mode). 705 706 Setting the host attribute also updates the host header and authority information, if present. 707 708 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 709 """ 710 return self.data.host 711 712 @host.setter 713 def host(self, val: str | bytes) -> None: 714 self.data.host = always_str(val, "idna", "strict") 715 self._update_host_and_authority() 716 717 @property 718 def host_header(self) -> str | None: 719 """ 720 The request's host/authority header. 721 722 This property maps to either ``request.headers["Host"]`` or 723 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 724 725 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 726 """ 727 if self.is_http2 or self.is_http3: 728 return self.authority or self.data.headers.get("Host", None) 729 else: 730 return self.data.headers.get("Host", None) 731 732 @host_header.setter 733 def host_header(self, val: None | str | bytes) -> None: 734 if val is None: 735 if self.is_http2 or self.is_http3: 736 self.data.authority = b"" 737 self.headers.pop("Host", None) 738 else: 739 if self.is_http2 or self.is_http3: 740 self.authority = val # type: ignore 741 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 742 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 743 self.headers["Host"] = val 744 745 @property 746 def port(self) -> int: 747 """ 748 Target port. 749 """ 750 return self.data.port 751 752 @port.setter 753 def port(self, port: int) -> None: 754 if not isinstance(port, int): 755 raise ValueError(f"Port must be an integer, not {port!r}.") 756 757 self.data.port = port 758 self._update_host_and_authority() 759 760 def _update_host_and_authority(self) -> None: 761 val = url.hostport(self.scheme, self.host, self.port) 762 763 # Update host header 764 if "Host" in self.data.headers: 765 self.data.headers["Host"] = val 766 # Update authority 767 if self.data.authority: 768 self.authority = val 769 770 @property 771 def path(self) -> str: 772 """ 773 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 774 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 775 776 This attribute includes both path and query parts of the target URI 777 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 778 """ 779 return self.data.path.decode("utf-8", "surrogateescape") 780 781 @path.setter 782 def path(self, val: str | bytes) -> None: 783 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 784 785 @property 786 def url(self) -> str: 787 """ 788 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 789 790 Settings this property updates these attributes as well. 791 """ 792 if self.first_line_format == "authority": 793 return f"{self.host}:{self.port}" 794 return url.unparse(self.scheme, self.host, self.port, self.path) 795 796 @url.setter 797 def url(self, val: str | bytes) -> None: 798 val = always_str(val, "utf-8", "surrogateescape") 799 self.scheme, self.host, self.port, self.path = url.parse(val) 800 801 @property 802 def pretty_host(self) -> str: 803 """ 804 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 805 This is useful in transparent mode where `Request.host` is only an IP address. 806 807 *Warning:* When working in adversarial environments, this may not reflect the actual destination 808 as the Host header could be spoofed. 809 """ 810 authority = self.host_header 811 if authority: 812 return url.parse_authority(authority, check=False)[0] 813 else: 814 return self.host 815 816 @property 817 def pretty_url(self) -> str: 818 """ 819 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 820 """ 821 if self.first_line_format == "authority": 822 return self.authority 823 824 host_header = self.host_header 825 if not host_header: 826 return self.url 827 828 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 829 pretty_port = pretty_port or url.default_port(self.scheme) or 443 830 831 return url.unparse(self.scheme, pretty_host, pretty_port, self.path) 832 833 def _get_query(self): 834 query = urllib.parse.urlparse(self.url).query 835 return tuple(url.decode(query)) 836 837 def _set_query(self, query_data): 838 query = url.encode(query_data) 839 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 840 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 841 842 @property 843 def query(self) -> multidict.MultiDictView[str, str]: 844 """ 845 The request query as a mutable mapping view on the request's path. 846 For the most part, this behaves like a dictionary. 847 Modifications to the MultiDictView update `Request.path`, and vice versa. 848 """ 849 return multidict.MultiDictView(self._get_query, self._set_query) 850 851 @query.setter 852 def query(self, value): 853 self._set_query(value) 854 855 def _get_cookies(self): 856 h = self.headers.get_all("Cookie") 857 return tuple(cookies.parse_cookie_headers(h)) 858 859 def _set_cookies(self, value): 860 self.headers["cookie"] = cookies.format_cookie_header(value) 861 862 @property 863 def cookies(self) -> multidict.MultiDictView[str, str]: 864 """ 865 The request cookies. 866 For the most part, this behaves like a dictionary. 867 Modifications to the MultiDictView update `Request.headers`, and vice versa. 868 """ 869 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 870 871 @cookies.setter 872 def cookies(self, value): 873 self._set_cookies(value) 874 875 @property 876 def path_components(self) -> tuple[str, ...]: 877 """ 878 The URL's path components as a tuple of strings. 879 Components are unquoted. 880 """ 881 path = urllib.parse.urlparse(self.url).path 882 # This needs to be a tuple so that it's immutable. 883 # Otherwise, this would fail silently: 884 # request.path_components.append("foo") 885 return tuple(url.unquote(i) for i in path.split("/") if i) 886 887 @path_components.setter 888 def path_components(self, components: Iterable[str]): 889 components = map(lambda x: url.quote(x, safe=""), components) 890 path = "/" + "/".join(components) 891 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 892 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 893 894 def anticache(self) -> None: 895 """ 896 Modifies this request to remove headers that might produce a cached response. 897 """ 898 delheaders = ( 899 "if-modified-since", 900 "if-none-match", 901 ) 902 for i in delheaders: 903 self.headers.pop(i, None) 904 905 def anticomp(self) -> None: 906 """ 907 Modify the Accept-Encoding header to only accept uncompressed responses. 908 """ 909 self.headers["accept-encoding"] = "identity" 910 911 def constrain_encoding(self) -> None: 912 """ 913 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 914 """ 915 accept_encoding = self.headers.get("accept-encoding") 916 if accept_encoding: 917 self.headers["accept-encoding"] = ", ".join( 918 e 919 for e in {"gzip", "identity", "deflate", "br", "zstd"} 920 if e in accept_encoding 921 ) 922 923 def _get_urlencoded_form(self): 924 is_valid_content_type = ( 925 "application/x-www-form-urlencoded" 926 in self.headers.get("content-type", "").lower() 927 ) 928 if is_valid_content_type: 929 return tuple(url.decode(self.get_text(strict=False))) 930 return () 931 932 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 933 """ 934 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 935 This will overwrite the existing content if there is one. 936 """ 937 self.headers["content-type"] = "application/x-www-form-urlencoded" 938 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 939 940 @property 941 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 942 """ 943 The URL-encoded form data. 944 945 If the content-type indicates non-form data or the form could not be parsed, this is set to 946 an empty `MultiDictView`. 947 948 Modifications to the MultiDictView update `Request.content`, and vice versa. 949 """ 950 return multidict.MultiDictView( 951 self._get_urlencoded_form, self._set_urlencoded_form 952 ) 953 954 @urlencoded_form.setter 955 def urlencoded_form(self, value): 956 self._set_urlencoded_form(value) 957 958 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 959 is_valid_content_type = ( 960 "multipart/form-data" in self.headers.get("content-type", "").lower() 961 ) 962 if is_valid_content_type and self.content is not None: 963 try: 964 return multipart.decode_multipart( 965 self.headers.get("content-type"), self.content 966 ) 967 except ValueError: 968 pass 969 return [] 970 971 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 972 ct = self.headers.get("content-type", "") 973 is_valid_content_type = ct.lower().startswith("multipart/form-data") 974 if not is_valid_content_type: 975 """ 976 Generate a random boundary here. 977 978 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 979 on generating the boundary. 980 """ 981 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 982 self.headers["content-type"] = ct = ( 983 f"multipart/form-data; boundary={boundary}" 984 ) 985 self.content = multipart.encode_multipart(ct, value) 986 987 @property 988 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 989 """ 990 The multipart form data. 991 992 If the content-type indicates non-form data or the form could not be parsed, this is set to 993 an empty `MultiDictView`. 994 995 Modifications to the MultiDictView update `Request.content`, and vice versa. 996 """ 997 return multidict.MultiDictView( 998 self._get_multipart_form, self._set_multipart_form 999 ) 1000 1001 @multipart_form.setter 1002 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1003 self._set_multipart_form(value) 1004 1005 1006class Response(Message): 1007 """ 1008 An HTTP response. 1009 """ 1010 1011 data: ResponseData 1012 1013 def __init__( 1014 self, 1015 http_version: bytes, 1016 status_code: int, 1017 reason: bytes, 1018 headers: Headers | tuple[tuple[bytes, bytes], ...], 1019 content: bytes | None, 1020 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1021 timestamp_start: float, 1022 timestamp_end: float | None, 1023 ): 1024 # auto-convert invalid types to retain compatibility with older code. 1025 if isinstance(http_version, str): 1026 http_version = http_version.encode("ascii", "strict") 1027 if isinstance(reason, str): 1028 reason = reason.encode("ascii", "strict") 1029 1030 if isinstance(content, str): 1031 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1032 if not isinstance(headers, Headers): 1033 headers = Headers(headers) 1034 if trailers is not None and not isinstance(trailers, Headers): 1035 trailers = Headers(trailers) 1036 1037 self.data = ResponseData( 1038 http_version=http_version, 1039 status_code=status_code, 1040 reason=reason, 1041 headers=headers, 1042 content=content, 1043 trailers=trailers, 1044 timestamp_start=timestamp_start, 1045 timestamp_end=timestamp_end, 1046 ) 1047 1048 def __repr__(self) -> str: 1049 if self.raw_content: 1050 ct = self.headers.get("content-type", "unknown content type") 1051 size = human.pretty_size(len(self.raw_content)) 1052 details = f"{ct}, {size}" 1053 else: 1054 details = "no content" 1055 return f"Response({self.status_code}, {details})" 1056 1057 @classmethod 1058 def make( 1059 cls, 1060 status_code: int = 200, 1061 content: bytes | str = b"", 1062 headers: ( 1063 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1064 ) = (), 1065 ) -> "Response": 1066 """ 1067 Simplified API for creating response objects. 1068 """ 1069 if isinstance(headers, Headers): 1070 headers = headers 1071 elif isinstance(headers, dict): 1072 headers = Headers( 1073 ( 1074 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1075 always_bytes(v, "utf-8", "surrogateescape"), 1076 ) 1077 for k, v in headers.items() 1078 ) 1079 elif isinstance(headers, Iterable): 1080 headers = Headers(headers) # type: ignore 1081 else: 1082 raise TypeError( 1083 "Expected headers to be an iterable or dict, but is {}.".format( 1084 type(headers).__name__ 1085 ) 1086 ) 1087 1088 resp = cls( 1089 b"HTTP/1.1", 1090 status_code, 1091 status_codes.RESPONSES.get(status_code, "").encode(), 1092 headers, 1093 None, 1094 None, 1095 time.time(), 1096 time.time(), 1097 ) 1098 1099 # Assign this manually to update the content-length header. 1100 if isinstance(content, bytes): 1101 resp.content = content 1102 elif isinstance(content, str): 1103 resp.text = content 1104 else: 1105 raise TypeError( 1106 f"Expected content to be str or bytes, but is {type(content).__name__}." 1107 ) 1108 1109 return resp 1110 1111 @property 1112 def status_code(self) -> int: 1113 """ 1114 HTTP Status Code, e.g. ``200``. 1115 """ 1116 return self.data.status_code 1117 1118 @status_code.setter 1119 def status_code(self, status_code: int) -> None: 1120 self.data.status_code = status_code 1121 1122 @property 1123 def reason(self) -> str: 1124 """ 1125 HTTP reason phrase, for example "Not Found". 1126 1127 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1128 """ 1129 # Encoding: http://stackoverflow.com/a/16674906/934719 1130 return self.data.reason.decode("ISO-8859-1") 1131 1132 @reason.setter 1133 def reason(self, reason: str | bytes) -> None: 1134 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1135 1136 def _get_cookies(self): 1137 h = self.headers.get_all("set-cookie") 1138 all_cookies = cookies.parse_set_cookie_headers(h) 1139 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1140 1141 def _set_cookies(self, value): 1142 cookie_headers = [] 1143 for k, v in value: 1144 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1145 cookie_headers.append(header) 1146 self.headers.set_all("set-cookie", cookie_headers) 1147 1148 @property 1149 def cookies( 1150 self, 1151 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1152 """ 1153 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1154 name strings, and values are `(cookie value, attributes)` tuples. Within 1155 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1156 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1157 1158 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1159 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1160 """ 1161 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1162 1163 @cookies.setter 1164 def cookies(self, value): 1165 self._set_cookies(value) 1166 1167 def refresh(self, now=None): 1168 """ 1169 This fairly complex and heuristic function refreshes a server 1170 response for replay. 1171 1172 - It adjusts date, expires, and last-modified headers. 1173 - It adjusts cookie expiration. 1174 """ 1175 if not now: 1176 now = time.time() 1177 delta = now - self.timestamp_start 1178 refresh_headers = [ 1179 "date", 1180 "expires", 1181 "last-modified", 1182 ] 1183 for i in refresh_headers: 1184 if i in self.headers: 1185 d = parsedate_tz(self.headers[i]) 1186 if d: 1187 new = mktime_tz(d) + delta 1188 try: 1189 self.headers[i] = formatdate(new, usegmt=True) 1190 except OSError: # pragma: no cover 1191 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1192 c = [] 1193 for set_cookie_header in self.headers.get_all("set-cookie"): 1194 try: 1195 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1196 except ValueError: 1197 refreshed = set_cookie_header 1198 c.append(refreshed) 1199 if c: 1200 self.headers.set_all("set-cookie", c) 1201 1202 1203class HTTPFlow(flow.Flow): 1204 """ 1205 An HTTPFlow is a collection of objects representing a single HTTP 1206 transaction. 1207 """ 1208 1209 request: Request 1210 """The client's HTTP request.""" 1211 response: Response | None = None 1212 """The server's HTTP response.""" 1213 error: flow.Error | None = None 1214 """ 1215 A connection or protocol error affecting this flow. 1216 1217 Note that it's possible for a Flow to have both a response and an error 1218 object. This might happen, for instance, when a response was received 1219 from the server, but there was an error sending it back to the client. 1220 """ 1221 1222 websocket: WebSocketData | None = None 1223 """ 1224 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1225 """ 1226 1227 def get_state(self) -> serializable.State: 1228 return { 1229 **super().get_state(), 1230 "request": self.request.get_state(), 1231 "response": self.response.get_state() if self.response else None, 1232 "websocket": self.websocket.get_state() if self.websocket else None, 1233 } 1234 1235 def set_state(self, state: serializable.State) -> None: 1236 self.request = Request.from_state(state.pop("request")) 1237 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1238 self.websocket = ( 1239 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1240 ) 1241 super().set_state(state) 1242 1243 def __repr__(self): 1244 s = "<HTTPFlow" 1245 for a in ( 1246 "request", 1247 "response", 1248 "websocket", 1249 "error", 1250 "client_conn", 1251 "server_conn", 1252 ): 1253 if getattr(self, a, False): 1254 s += f"\r\n {a} = {{flow.{a}}}" 1255 s += ">" 1256 return s.format(flow=self) 1257 1258 @property 1259 def timestamp_start(self) -> float: 1260 """*Read-only:* An alias for `Request.timestamp_start`.""" 1261 return self.request.timestamp_start 1262 1263 @property 1264 def mode(self) -> str: # pragma: no cover 1265 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1266 return getattr(self, "_mode", "regular") 1267 1268 @mode.setter 1269 def mode(self, val: str) -> None: # pragma: no cover 1270 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1271 self._mode = val 1272 1273 def copy(self): 1274 f = super().copy() 1275 if self.request: 1276 f.request = self.request.copy() 1277 if self.response: 1278 f.response = self.response.copy() 1279 return f 1280 1281 1282__all__ = [ 1283 "HTTPFlow", 1284 "Message", 1285 "Request", 1286 "Response", 1287 "Headers", 1288]
1204class HTTPFlow(flow.Flow): 1205 """ 1206 An HTTPFlow is a collection of objects representing a single HTTP 1207 transaction. 1208 """ 1209 1210 request: Request 1211 """The client's HTTP request.""" 1212 response: Response | None = None 1213 """The server's HTTP response.""" 1214 error: flow.Error | None = None 1215 """ 1216 A connection or protocol error affecting this flow. 1217 1218 Note that it's possible for a Flow to have both a response and an error 1219 object. This might happen, for instance, when a response was received 1220 from the server, but there was an error sending it back to the client. 1221 """ 1222 1223 websocket: WebSocketData | None = None 1224 """ 1225 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1226 """ 1227 1228 def get_state(self) -> serializable.State: 1229 return { 1230 **super().get_state(), 1231 "request": self.request.get_state(), 1232 "response": self.response.get_state() if self.response else None, 1233 "websocket": self.websocket.get_state() if self.websocket else None, 1234 } 1235 1236 def set_state(self, state: serializable.State) -> None: 1237 self.request = Request.from_state(state.pop("request")) 1238 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1239 self.websocket = ( 1240 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1241 ) 1242 super().set_state(state) 1243 1244 def __repr__(self): 1245 s = "<HTTPFlow" 1246 for a in ( 1247 "request", 1248 "response", 1249 "websocket", 1250 "error", 1251 "client_conn", 1252 "server_conn", 1253 ): 1254 if getattr(self, a, False): 1255 s += f"\r\n {a} = {{flow.{a}}}" 1256 s += ">" 1257 return s.format(flow=self) 1258 1259 @property 1260 def timestamp_start(self) -> float: 1261 """*Read-only:* An alias for `Request.timestamp_start`.""" 1262 return self.request.timestamp_start 1263 1264 @property 1265 def mode(self) -> str: # pragma: no cover 1266 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1267 return getattr(self, "_mode", "regular") 1268 1269 @mode.setter 1270 def mode(self, val: str) -> None: # pragma: no cover 1271 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1272 self._mode = val 1273 1274 def copy(self): 1275 f = super().copy() 1276 if self.request: 1277 f.request = self.request.copy() 1278 if self.response: 1279 f.response = self.response.copy() 1280 return f
An HTTPFlow is a collection of objects representing a single HTTP transaction.
A connection or protocol error affecting this flow.
Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.
If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1259 @property 1260 def timestamp_start(self) -> float: 1261 """*Read-only:* An alias for `Request.timestamp_start`.""" 1262 return self.request.timestamp_start
Read-only: An alias for Request.timestamp_start
.
234class Message(serializable.Serializable): 235 """Base class for `Request` and `Response`.""" 236 237 @classmethod 238 def from_state(cls, state): 239 return cls(**state) 240 241 def get_state(self): 242 return self.data.get_state() 243 244 def set_state(self, state): 245 self.data.set_state(state) 246 247 data: MessageData 248 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 249 """ 250 This attribute controls if the message body should be streamed. 251 252 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 253 This makes it possible to perform string replacements on the entire body. 254 If `True`, the message body will not be buffered on the proxy 255 but immediately forwarded instead. 256 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 257 Please note that packet boundaries generally should not be relied upon. 258 259 This attribute must be set in the `requestheaders` or `responseheaders` hook. 260 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 261 """ 262 263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape") 269 270 @http_version.setter 271 def http_version(self, http_version: str | bytes) -> None: 272 self.data.http_version = strutils.always_bytes( 273 http_version, "utf-8", "surrogateescape" 274 ) 275 276 @property 277 def is_http10(self) -> bool: 278 return self.data.http_version == b"HTTP/1.0" 279 280 @property 281 def is_http11(self) -> bool: 282 return self.data.http_version == b"HTTP/1.1" 283 284 @property 285 def is_http2(self) -> bool: 286 return self.data.http_version == b"HTTP/2.0" 287 288 @property 289 def is_http3(self) -> bool: 290 return self.data.http_version == b"HTTP/3" 291 292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers 298 299 @headers.setter 300 def headers(self, h: Headers) -> None: 301 self.data.headers = h 302 303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers 309 310 @trailers.setter 311 def trailers(self, h: Headers | None) -> None: 312 self.data.trailers = h 313 314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 321 *See also:* `Message.content`, `Message.text` 322 """ 323 return self.data.content 324 325 @raw_content.setter 326 def raw_content(self, content: bytes | None) -> None: 327 self.data.content = content 328 329 @property 330 def content(self) -> bytes | None: 331 """ 332 The uncompressed HTTP message body as bytes. 333 334 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 335 336 *See also:* `Message.raw_content`, `Message.text` 337 """ 338 return self.get_content() 339 340 @content.setter 341 def content(self, value: bytes | None) -> None: 342 self.set_content(value) 343 344 @property 345 def text(self) -> str | None: 346 """ 347 The uncompressed and decoded HTTP message body as text. 348 349 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 350 351 *See also:* `Message.raw_content`, `Message.content` 352 """ 353 return self.get_text() 354 355 @text.setter 356 def text(self, value: str | None) -> None: 357 self.set_text(value) 358 359 def set_content(self, value: bytes | None) -> None: 360 if value is None: 361 self.raw_content = None 362 return 363 if not isinstance(value, bytes): 364 raise TypeError( 365 f"Message content must be bytes, not {type(value).__name__}. " 366 "Please use .text if you want to assign a str." 367 ) 368 ce = self.headers.get("content-encoding") 369 try: 370 self.raw_content = encoding.encode(value, ce or "identity") 371 except ValueError: 372 # So we have an invalid content-encoding? 373 # Let's remove it! 374 del self.headers["content-encoding"] 375 self.raw_content = value 376 377 if "transfer-encoding" in self.headers: 378 # https://httpwg.org/specs/rfc7230.html#header.content-length 379 # don't set content-length if a transfer-encoding is provided 380 pass 381 else: 382 self.headers["content-length"] = str(len(self.raw_content)) 383 384 def get_content(self, strict: bool = True) -> bytes | None: 385 """ 386 Similar to `Message.content`, but does not raise if `strict` is `False`. 387 Instead, the compressed message body is returned as-is. 388 """ 389 if self.raw_content is None: 390 return None 391 ce = self.headers.get("content-encoding") 392 if ce: 393 try: 394 content = encoding.decode(self.raw_content, ce) 395 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 396 if isinstance(content, str): 397 raise ValueError(f"Invalid Content-Encoding: {ce}") 398 return content 399 except ValueError: 400 if strict: 401 raise 402 return self.raw_content 403 else: 404 return self.raw_content 405 406 def set_text(self, text: str | None) -> None: 407 if text is None: 408 self.content = None 409 return 410 enc = infer_content_encoding(self.headers.get("content-type", "")) 411 412 try: 413 self.content = cast(bytes, encoding.encode(text, enc)) 414 except ValueError: 415 # Fall back to UTF-8 and update the content-type header. 416 ct = parse_content_type(self.headers.get("content-type", "")) or ( 417 "text", 418 "plain", 419 {}, 420 ) 421 ct[2]["charset"] = "utf-8" 422 self.headers["content-type"] = assemble_content_type(*ct) 423 enc = "utf8" 424 self.content = text.encode(enc, "surrogateescape") 425 426 def get_text(self, strict: bool = True) -> str | None: 427 """ 428 Similar to `Message.text`, but does not raise if `strict` is `False`. 429 Instead, the message body is returned as surrogate-escaped UTF-8. 430 """ 431 content = self.get_content(strict) 432 if content is None: 433 return None 434 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 435 try: 436 return cast(str, encoding.decode(content, enc)) 437 except ValueError: 438 if strict: 439 raise 440 return content.decode("utf8", "surrogateescape") 441 442 @property 443 def timestamp_start(self) -> float: 444 """ 445 *Timestamp:* Headers received. 446 """ 447 return self.data.timestamp_start 448 449 @timestamp_start.setter 450 def timestamp_start(self, timestamp_start: float) -> None: 451 self.data.timestamp_start = timestamp_start 452 453 @property 454 def timestamp_end(self) -> float | None: 455 """ 456 *Timestamp:* Last byte received. 457 """ 458 return self.data.timestamp_end 459 460 @timestamp_end.setter 461 def timestamp_end(self, timestamp_end: float | None): 462 self.data.timestamp_end = timestamp_end 463 464 def decode(self, strict: bool = True) -> None: 465 """ 466 Decodes body based on the current Content-Encoding header, then 467 removes the header. If there is no Content-Encoding header, no 468 action is taken. 469 470 *Raises:* 471 - `ValueError`, when the content-encoding is invalid and strict is True. 472 """ 473 decoded = self.get_content(strict) 474 self.headers.pop("content-encoding", None) 475 self.content = decoded 476 477 def encode(self, encoding: str) -> None: 478 """ 479 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 480 Any existing content-encodings are overwritten, the content is not decoded beforehand. 481 482 *Raises:* 483 - `ValueError`, when the specified content-encoding is invalid. 484 """ 485 self.headers["content-encoding"] = encoding 486 self.content = self.raw_content 487 if "content-encoding" not in self.headers: 488 raise ValueError(f"Invalid content encoding {repr(encoding)}") 489 490 def json(self, **kwargs: Any) -> Any: 491 """ 492 Returns the JSON encoded content of the response, if any. 493 `**kwargs` are optional arguments that will be 494 passed to `json.loads()`. 495 496 Will raise if the content can not be decoded and then parsed as JSON. 497 498 *Raises:* 499 - `json.decoder.JSONDecodeError` if content is not valid JSON. 500 - `TypeError` if the content is not available, for example because the response 501 has been streamed. 502 """ 503 content = self.get_content(strict=False) 504 if content is None: 505 raise TypeError("Message content is not available.") 506 else: 507 return json.loads(content, **kwargs)
This attribute controls if the message body should be streamed.
If False
, mitmproxy will buffer the entire body before forwarding it to the destination.
This makes it possible to perform string replacements on the entire body.
If True
, the message body will not be buffered on the proxy
but immediately forwarded instead.
Alternatively, a transformation function can be specified, which will be called for each chunk of data.
Please note that packet boundaries generally should not be relied upon.
This attribute must be set in the requestheaders
or responseheaders
hook.
Setting it in request
or response
is already too late, mitmproxy has buffered the message body already.
263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape")
HTTP version string, for example HTTP/1.1
.
292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers
The HTTP headers.
303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers
The HTTP trailers.
314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 321 *See also:* `Message.content`, `Message.text` 322 """ 323 return self.data.content
The raw (potentially compressed) HTTP message body.
In contrast to Message.content
and Message.text
, accessing this property never raises.
See also: Message.content
, Message.text
329 @property 330 def content(self) -> bytes | None: 331 """ 332 The uncompressed HTTP message body as bytes. 333 334 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 335 336 *See also:* `Message.raw_content`, `Message.text` 337 """ 338 return self.get_content()
The uncompressed HTTP message body as bytes.
Accessing this attribute may raise a ValueError
when the HTTP content-encoding is invalid.
See also: Message.raw_content
, Message.text
344 @property 345 def text(self) -> str | None: 346 """ 347 The uncompressed and decoded HTTP message body as text. 348 349 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 350 351 *See also:* `Message.raw_content`, `Message.content` 352 """ 353 return self.get_text()
The uncompressed and decoded HTTP message body as text.
Accessing this attribute may raise a ValueError
when either content-encoding or charset is invalid.
See also: Message.raw_content
, Message.content
359 def set_content(self, value: bytes | None) -> None: 360 if value is None: 361 self.raw_content = None 362 return 363 if not isinstance(value, bytes): 364 raise TypeError( 365 f"Message content must be bytes, not {type(value).__name__}. " 366 "Please use .text if you want to assign a str." 367 ) 368 ce = self.headers.get("content-encoding") 369 try: 370 self.raw_content = encoding.encode(value, ce or "identity") 371 except ValueError: 372 # So we have an invalid content-encoding? 373 # Let's remove it! 374 del self.headers["content-encoding"] 375 self.raw_content = value 376 377 if "transfer-encoding" in self.headers: 378 # https://httpwg.org/specs/rfc7230.html#header.content-length 379 # don't set content-length if a transfer-encoding is provided 380 pass 381 else: 382 self.headers["content-length"] = str(len(self.raw_content))
384 def get_content(self, strict: bool = True) -> bytes | None: 385 """ 386 Similar to `Message.content`, but does not raise if `strict` is `False`. 387 Instead, the compressed message body is returned as-is. 388 """ 389 if self.raw_content is None: 390 return None 391 ce = self.headers.get("content-encoding") 392 if ce: 393 try: 394 content = encoding.decode(self.raw_content, ce) 395 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 396 if isinstance(content, str): 397 raise ValueError(f"Invalid Content-Encoding: {ce}") 398 return content 399 except ValueError: 400 if strict: 401 raise 402 return self.raw_content 403 else: 404 return self.raw_content
Similar to Message.content
, but does not raise if strict
is False
.
Instead, the compressed message body is returned as-is.
406 def set_text(self, text: str | None) -> None: 407 if text is None: 408 self.content = None 409 return 410 enc = infer_content_encoding(self.headers.get("content-type", "")) 411 412 try: 413 self.content = cast(bytes, encoding.encode(text, enc)) 414 except ValueError: 415 # Fall back to UTF-8 and update the content-type header. 416 ct = parse_content_type(self.headers.get("content-type", "")) or ( 417 "text", 418 "plain", 419 {}, 420 ) 421 ct[2]["charset"] = "utf-8" 422 self.headers["content-type"] = assemble_content_type(*ct) 423 enc = "utf8" 424 self.content = text.encode(enc, "surrogateescape")
426 def get_text(self, strict: bool = True) -> str | None: 427 """ 428 Similar to `Message.text`, but does not raise if `strict` is `False`. 429 Instead, the message body is returned as surrogate-escaped UTF-8. 430 """ 431 content = self.get_content(strict) 432 if content is None: 433 return None 434 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 435 try: 436 return cast(str, encoding.decode(content, enc)) 437 except ValueError: 438 if strict: 439 raise 440 return content.decode("utf8", "surrogateescape")
Similar to Message.text
, but does not raise if strict
is False
.
Instead, the message body is returned as surrogate-escaped UTF-8.
442 @property 443 def timestamp_start(self) -> float: 444 """ 445 *Timestamp:* Headers received. 446 """ 447 return self.data.timestamp_start
Timestamp: Headers received.
453 @property 454 def timestamp_end(self) -> float | None: 455 """ 456 *Timestamp:* Last byte received. 457 """ 458 return self.data.timestamp_end
Timestamp: Last byte received.
464 def decode(self, strict: bool = True) -> None: 465 """ 466 Decodes body based on the current Content-Encoding header, then 467 removes the header. If there is no Content-Encoding header, no 468 action is taken. 469 470 *Raises:* 471 - `ValueError`, when the content-encoding is invalid and strict is True. 472 """ 473 decoded = self.get_content(strict) 474 self.headers.pop("content-encoding", None) 475 self.content = decoded
Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.
Raises:
ValueError
, when the content-encoding is invalid and strict is True.
477 def encode(self, encoding: str) -> None: 478 """ 479 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 480 Any existing content-encodings are overwritten, the content is not decoded beforehand. 481 482 *Raises:* 483 - `ValueError`, when the specified content-encoding is invalid. 484 """ 485 self.headers["content-encoding"] = encoding 486 self.content = self.raw_content 487 if "content-encoding" not in self.headers: 488 raise ValueError(f"Invalid content encoding {repr(encoding)}")
Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.
Raises:
ValueError
, when the specified content-encoding is invalid.
490 def json(self, **kwargs: Any) -> Any: 491 """ 492 Returns the JSON encoded content of the response, if any. 493 `**kwargs` are optional arguments that will be 494 passed to `json.loads()`. 495 496 Will raise if the content can not be decoded and then parsed as JSON. 497 498 *Raises:* 499 - `json.decoder.JSONDecodeError` if content is not valid JSON. 500 - `TypeError` if the content is not available, for example because the response 501 has been streamed. 502 """ 503 content = self.get_content(strict=False) 504 if content is None: 505 raise TypeError("Message content is not available.") 506 else: 507 return json.loads(content, **kwargs)
Returns the JSON encoded content of the response, if any.
**kwargs
are optional arguments that will be
passed to json.loads()
.
Will raise if the content can not be decoded and then parsed as JSON.
Raises:
json.decoder.JSONDecodeError
if content is not valid JSON.TypeError
if the content is not available, for example because the response has been streamed.
510class Request(Message): 511 """ 512 An HTTP request. 513 """ 514 515 data: RequestData 516 517 def __init__( 518 self, 519 host: str, 520 port: int, 521 method: bytes, 522 scheme: bytes, 523 authority: bytes, 524 path: bytes, 525 http_version: bytes, 526 headers: Headers | tuple[tuple[bytes, bytes], ...], 527 content: bytes | None, 528 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 529 timestamp_start: float, 530 timestamp_end: float | None, 531 ): 532 # auto-convert invalid types to retain compatibility with older code. 533 if isinstance(host, bytes): 534 host = host.decode("idna", "strict") 535 if isinstance(method, str): 536 method = method.encode("ascii", "strict") 537 if isinstance(scheme, str): 538 scheme = scheme.encode("ascii", "strict") 539 if isinstance(authority, str): 540 authority = authority.encode("ascii", "strict") 541 if isinstance(path, str): 542 path = path.encode("ascii", "strict") 543 if isinstance(http_version, str): 544 http_version = http_version.encode("ascii", "strict") 545 546 if isinstance(content, str): 547 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 548 if not isinstance(headers, Headers): 549 headers = Headers(headers) 550 if trailers is not None and not isinstance(trailers, Headers): 551 trailers = Headers(trailers) 552 553 self.data = RequestData( 554 host=host, 555 port=port, 556 method=method, 557 scheme=scheme, 558 authority=authority, 559 path=path, 560 http_version=http_version, 561 headers=headers, 562 content=content, 563 trailers=trailers, 564 timestamp_start=timestamp_start, 565 timestamp_end=timestamp_end, 566 ) 567 568 def __repr__(self) -> str: 569 if self.host and self.port: 570 hostport = f"{self.host}:{self.port}" 571 else: 572 hostport = "" 573 path = self.path or "" 574 return f"Request({self.method} {hostport}{path})" 575 576 @classmethod 577 def make( 578 cls, 579 method: str, 580 url: str, 581 content: bytes | str = "", 582 headers: ( 583 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 584 ) = (), 585 ) -> "Request": 586 """ 587 Simplified API for creating request objects. 588 """ 589 # Headers can be list or dict, we differentiate here. 590 if isinstance(headers, Headers): 591 pass 592 elif isinstance(headers, dict): 593 headers = Headers( 594 ( 595 always_bytes(k, "utf-8", "surrogateescape"), 596 always_bytes(v, "utf-8", "surrogateescape"), 597 ) 598 for k, v in headers.items() 599 ) 600 elif isinstance(headers, Iterable): 601 headers = Headers(headers) # type: ignore 602 else: 603 raise TypeError( 604 "Expected headers to be an iterable or dict, but is {}.".format( 605 type(headers).__name__ 606 ) 607 ) 608 609 req = cls( 610 "", 611 0, 612 method.encode("utf-8", "surrogateescape"), 613 b"", 614 b"", 615 b"", 616 b"HTTP/1.1", 617 headers, 618 b"", 619 None, 620 time.time(), 621 time.time(), 622 ) 623 624 req.url = url 625 # Assign this manually to update the content-length header. 626 if isinstance(content, bytes): 627 req.content = content 628 elif isinstance(content, str): 629 req.text = content 630 else: 631 raise TypeError( 632 f"Expected content to be str or bytes, but is {type(content).__name__}." 633 ) 634 635 return req 636 637 @property 638 def first_line_format(self) -> str: 639 """ 640 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 641 642 origin-form and asterisk-form are subsumed as "relative". 643 """ 644 if self.method == "CONNECT": 645 return "authority" 646 elif self.authority: 647 return "absolute" 648 else: 649 return "relative" 650 651 @property 652 def method(self) -> str: 653 """ 654 HTTP request method, e.g. "GET". 655 """ 656 return self.data.method.decode("utf-8", "surrogateescape").upper() 657 658 @method.setter 659 def method(self, val: str | bytes) -> None: 660 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 661 662 @property 663 def scheme(self) -> str: 664 """ 665 HTTP request scheme, which should be "http" or "https". 666 """ 667 return self.data.scheme.decode("utf-8", "surrogateescape") 668 669 @scheme.setter 670 def scheme(self, val: str | bytes) -> None: 671 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 672 673 @property 674 def authority(self) -> str: 675 """ 676 HTTP request authority. 677 678 For HTTP/1, this is the authority portion of the request target 679 (in either absolute-form or authority-form). 680 For origin-form and asterisk-form requests, this property is set to an empty string. 681 682 For HTTP/2, this is the :authority pseudo header. 683 684 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 685 """ 686 try: 687 return self.data.authority.decode("idna") 688 except UnicodeError: 689 return self.data.authority.decode("utf8", "surrogateescape") 690 691 @authority.setter 692 def authority(self, val: str | bytes) -> None: 693 if isinstance(val, str): 694 try: 695 val = val.encode("idna", "strict") 696 except UnicodeError: 697 val = val.encode("utf8", "surrogateescape") # type: ignore 698 self.data.authority = val 699 700 @property 701 def host(self) -> str: 702 """ 703 Target server for this request. This may be parsed from the raw request 704 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 705 or inferred from the proxy mode (e.g. an IP in transparent mode). 706 707 Setting the host attribute also updates the host header and authority information, if present. 708 709 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 710 """ 711 return self.data.host 712 713 @host.setter 714 def host(self, val: str | bytes) -> None: 715 self.data.host = always_str(val, "idna", "strict") 716 self._update_host_and_authority() 717 718 @property 719 def host_header(self) -> str | None: 720 """ 721 The request's host/authority header. 722 723 This property maps to either ``request.headers["Host"]`` or 724 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 725 726 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 727 """ 728 if self.is_http2 or self.is_http3: 729 return self.authority or self.data.headers.get("Host", None) 730 else: 731 return self.data.headers.get("Host", None) 732 733 @host_header.setter 734 def host_header(self, val: None | str | bytes) -> None: 735 if val is None: 736 if self.is_http2 or self.is_http3: 737 self.data.authority = b"" 738 self.headers.pop("Host", None) 739 else: 740 if self.is_http2 or self.is_http3: 741 self.authority = val # type: ignore 742 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 743 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 744 self.headers["Host"] = val 745 746 @property 747 def port(self) -> int: 748 """ 749 Target port. 750 """ 751 return self.data.port 752 753 @port.setter 754 def port(self, port: int) -> None: 755 if not isinstance(port, int): 756 raise ValueError(f"Port must be an integer, not {port!r}.") 757 758 self.data.port = port 759 self._update_host_and_authority() 760 761 def _update_host_and_authority(self) -> None: 762 val = url.hostport(self.scheme, self.host, self.port) 763 764 # Update host header 765 if "Host" in self.data.headers: 766 self.data.headers["Host"] = val 767 # Update authority 768 if self.data.authority: 769 self.authority = val 770 771 @property 772 def path(self) -> str: 773 """ 774 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 775 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 776 777 This attribute includes both path and query parts of the target URI 778 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 779 """ 780 return self.data.path.decode("utf-8", "surrogateescape") 781 782 @path.setter 783 def path(self, val: str | bytes) -> None: 784 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 785 786 @property 787 def url(self) -> str: 788 """ 789 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 790 791 Settings this property updates these attributes as well. 792 """ 793 if self.first_line_format == "authority": 794 return f"{self.host}:{self.port}" 795 return url.unparse(self.scheme, self.host, self.port, self.path) 796 797 @url.setter 798 def url(self, val: str | bytes) -> None: 799 val = always_str(val, "utf-8", "surrogateescape") 800 self.scheme, self.host, self.port, self.path = url.parse(val) 801 802 @property 803 def pretty_host(self) -> str: 804 """ 805 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 806 This is useful in transparent mode where `Request.host` is only an IP address. 807 808 *Warning:* When working in adversarial environments, this may not reflect the actual destination 809 as the Host header could be spoofed. 810 """ 811 authority = self.host_header 812 if authority: 813 return url.parse_authority(authority, check=False)[0] 814 else: 815 return self.host 816 817 @property 818 def pretty_url(self) -> str: 819 """ 820 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 821 """ 822 if self.first_line_format == "authority": 823 return self.authority 824 825 host_header = self.host_header 826 if not host_header: 827 return self.url 828 829 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 830 pretty_port = pretty_port or url.default_port(self.scheme) or 443 831 832 return url.unparse(self.scheme, pretty_host, pretty_port, self.path) 833 834 def _get_query(self): 835 query = urllib.parse.urlparse(self.url).query 836 return tuple(url.decode(query)) 837 838 def _set_query(self, query_data): 839 query = url.encode(query_data) 840 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 841 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 842 843 @property 844 def query(self) -> multidict.MultiDictView[str, str]: 845 """ 846 The request query as a mutable mapping view on the request's path. 847 For the most part, this behaves like a dictionary. 848 Modifications to the MultiDictView update `Request.path`, and vice versa. 849 """ 850 return multidict.MultiDictView(self._get_query, self._set_query) 851 852 @query.setter 853 def query(self, value): 854 self._set_query(value) 855 856 def _get_cookies(self): 857 h = self.headers.get_all("Cookie") 858 return tuple(cookies.parse_cookie_headers(h)) 859 860 def _set_cookies(self, value): 861 self.headers["cookie"] = cookies.format_cookie_header(value) 862 863 @property 864 def cookies(self) -> multidict.MultiDictView[str, str]: 865 """ 866 The request cookies. 867 For the most part, this behaves like a dictionary. 868 Modifications to the MultiDictView update `Request.headers`, and vice versa. 869 """ 870 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 871 872 @cookies.setter 873 def cookies(self, value): 874 self._set_cookies(value) 875 876 @property 877 def path_components(self) -> tuple[str, ...]: 878 """ 879 The URL's path components as a tuple of strings. 880 Components are unquoted. 881 """ 882 path = urllib.parse.urlparse(self.url).path 883 # This needs to be a tuple so that it's immutable. 884 # Otherwise, this would fail silently: 885 # request.path_components.append("foo") 886 return tuple(url.unquote(i) for i in path.split("/") if i) 887 888 @path_components.setter 889 def path_components(self, components: Iterable[str]): 890 components = map(lambda x: url.quote(x, safe=""), components) 891 path = "/" + "/".join(components) 892 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 893 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 894 895 def anticache(self) -> None: 896 """ 897 Modifies this request to remove headers that might produce a cached response. 898 """ 899 delheaders = ( 900 "if-modified-since", 901 "if-none-match", 902 ) 903 for i in delheaders: 904 self.headers.pop(i, None) 905 906 def anticomp(self) -> None: 907 """ 908 Modify the Accept-Encoding header to only accept uncompressed responses. 909 """ 910 self.headers["accept-encoding"] = "identity" 911 912 def constrain_encoding(self) -> None: 913 """ 914 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 915 """ 916 accept_encoding = self.headers.get("accept-encoding") 917 if accept_encoding: 918 self.headers["accept-encoding"] = ", ".join( 919 e 920 for e in {"gzip", "identity", "deflate", "br", "zstd"} 921 if e in accept_encoding 922 ) 923 924 def _get_urlencoded_form(self): 925 is_valid_content_type = ( 926 "application/x-www-form-urlencoded" 927 in self.headers.get("content-type", "").lower() 928 ) 929 if is_valid_content_type: 930 return tuple(url.decode(self.get_text(strict=False))) 931 return () 932 933 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 934 """ 935 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 936 This will overwrite the existing content if there is one. 937 """ 938 self.headers["content-type"] = "application/x-www-form-urlencoded" 939 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 940 941 @property 942 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 943 """ 944 The URL-encoded form data. 945 946 If the content-type indicates non-form data or the form could not be parsed, this is set to 947 an empty `MultiDictView`. 948 949 Modifications to the MultiDictView update `Request.content`, and vice versa. 950 """ 951 return multidict.MultiDictView( 952 self._get_urlencoded_form, self._set_urlencoded_form 953 ) 954 955 @urlencoded_form.setter 956 def urlencoded_form(self, value): 957 self._set_urlencoded_form(value) 958 959 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 960 is_valid_content_type = ( 961 "multipart/form-data" in self.headers.get("content-type", "").lower() 962 ) 963 if is_valid_content_type and self.content is not None: 964 try: 965 return multipart.decode_multipart( 966 self.headers.get("content-type"), self.content 967 ) 968 except ValueError: 969 pass 970 return [] 971 972 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 973 ct = self.headers.get("content-type", "") 974 is_valid_content_type = ct.lower().startswith("multipart/form-data") 975 if not is_valid_content_type: 976 """ 977 Generate a random boundary here. 978 979 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 980 on generating the boundary. 981 """ 982 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 983 self.headers["content-type"] = ct = ( 984 f"multipart/form-data; boundary={boundary}" 985 ) 986 self.content = multipart.encode_multipart(ct, value) 987 988 @property 989 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 990 """ 991 The multipart form data. 992 993 If the content-type indicates non-form data or the form could not be parsed, this is set to 994 an empty `MultiDictView`. 995 996 Modifications to the MultiDictView update `Request.content`, and vice versa. 997 """ 998 return multidict.MultiDictView( 999 self._get_multipart_form, self._set_multipart_form 1000 ) 1001 1002 @multipart_form.setter 1003 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1004 self._set_multipart_form(value)
An HTTP request.
517 def __init__( 518 self, 519 host: str, 520 port: int, 521 method: bytes, 522 scheme: bytes, 523 authority: bytes, 524 path: bytes, 525 http_version: bytes, 526 headers: Headers | tuple[tuple[bytes, bytes], ...], 527 content: bytes | None, 528 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 529 timestamp_start: float, 530 timestamp_end: float | None, 531 ): 532 # auto-convert invalid types to retain compatibility with older code. 533 if isinstance(host, bytes): 534 host = host.decode("idna", "strict") 535 if isinstance(method, str): 536 method = method.encode("ascii", "strict") 537 if isinstance(scheme, str): 538 scheme = scheme.encode("ascii", "strict") 539 if isinstance(authority, str): 540 authority = authority.encode("ascii", "strict") 541 if isinstance(path, str): 542 path = path.encode("ascii", "strict") 543 if isinstance(http_version, str): 544 http_version = http_version.encode("ascii", "strict") 545 546 if isinstance(content, str): 547 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 548 if not isinstance(headers, Headers): 549 headers = Headers(headers) 550 if trailers is not None and not isinstance(trailers, Headers): 551 trailers = Headers(trailers) 552 553 self.data = RequestData( 554 host=host, 555 port=port, 556 method=method, 557 scheme=scheme, 558 authority=authority, 559 path=path, 560 http_version=http_version, 561 headers=headers, 562 content=content, 563 trailers=trailers, 564 timestamp_start=timestamp_start, 565 timestamp_end=timestamp_end, 566 )
576 @classmethod 577 def make( 578 cls, 579 method: str, 580 url: str, 581 content: bytes | str = "", 582 headers: ( 583 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 584 ) = (), 585 ) -> "Request": 586 """ 587 Simplified API for creating request objects. 588 """ 589 # Headers can be list or dict, we differentiate here. 590 if isinstance(headers, Headers): 591 pass 592 elif isinstance(headers, dict): 593 headers = Headers( 594 ( 595 always_bytes(k, "utf-8", "surrogateescape"), 596 always_bytes(v, "utf-8", "surrogateescape"), 597 ) 598 for k, v in headers.items() 599 ) 600 elif isinstance(headers, Iterable): 601 headers = Headers(headers) # type: ignore 602 else: 603 raise TypeError( 604 "Expected headers to be an iterable or dict, but is {}.".format( 605 type(headers).__name__ 606 ) 607 ) 608 609 req = cls( 610 "", 611 0, 612 method.encode("utf-8", "surrogateescape"), 613 b"", 614 b"", 615 b"", 616 b"HTTP/1.1", 617 headers, 618 b"", 619 None, 620 time.time(), 621 time.time(), 622 ) 623 624 req.url = url 625 # Assign this manually to update the content-length header. 626 if isinstance(content, bytes): 627 req.content = content 628 elif isinstance(content, str): 629 req.text = content 630 else: 631 raise TypeError( 632 f"Expected content to be str or bytes, but is {type(content).__name__}." 633 ) 634 635 return req
Simplified API for creating request objects.
637 @property 638 def first_line_format(self) -> str: 639 """ 640 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 641 642 origin-form and asterisk-form are subsumed as "relative". 643 """ 644 if self.method == "CONNECT": 645 return "authority" 646 elif self.authority: 647 return "absolute" 648 else: 649 return "relative"
Read-only: HTTP request form as defined in RFC 7230.
origin-form and asterisk-form are subsumed as "relative".
651 @property 652 def method(self) -> str: 653 """ 654 HTTP request method, e.g. "GET". 655 """ 656 return self.data.method.decode("utf-8", "surrogateescape").upper()
HTTP request method, e.g. "GET".
662 @property 663 def scheme(self) -> str: 664 """ 665 HTTP request scheme, which should be "http" or "https". 666 """ 667 return self.data.scheme.decode("utf-8", "surrogateescape")
HTTP request scheme, which should be "http" or "https".
700 @property 701 def host(self) -> str: 702 """ 703 Target server for this request. This may be parsed from the raw request 704 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 705 or inferred from the proxy mode (e.g. an IP in transparent mode). 706 707 Setting the host attribute also updates the host header and authority information, if present. 708 709 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 710 """ 711 return self.data.host
Target server for this request. This may be parsed from the raw request
(e.g. from a GET http://example.com/ HTTP/1.1
request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header and authority information, if present.
See also: Request.authority
, Request.host_header
, Request.pretty_host
718 @property 719 def host_header(self) -> str | None: 720 """ 721 The request's host/authority header. 722 723 This property maps to either ``request.headers["Host"]`` or 724 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 725 726 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 727 """ 728 if self.is_http2 or self.is_http3: 729 return self.authority or self.data.headers.get("Host", None) 730 else: 731 return self.data.headers.get("Host", None)
The request's host/authority header.
This property maps to either request.headers["Host"]
or
request.authority
, depending on whether it's HTTP/1.x or HTTP/2.0.
See also: Request.authority
,Request.host
, Request.pretty_host
771 @property 772 def path(self) -> str: 773 """ 774 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 775 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 776 777 This attribute includes both path and query parts of the target URI 778 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 779 """ 780 return self.data.path.decode("utf-8", "surrogateescape")
HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".
This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).
786 @property 787 def url(self) -> str: 788 """ 789 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 790 791 Settings this property updates these attributes as well. 792 """ 793 if self.first_line_format == "authority": 794 return f"{self.host}:{self.port}" 795 return url.unparse(self.scheme, self.host, self.port, self.path)
The full URL string, constructed from Request.scheme
, Request.host
, Request.port
and Request.path
.
Settings this property updates these attributes as well.
802 @property 803 def pretty_host(self) -> str: 804 """ 805 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 806 This is useful in transparent mode where `Request.host` is only an IP address. 807 808 *Warning:* When working in adversarial environments, this may not reflect the actual destination 809 as the Host header could be spoofed. 810 """ 811 authority = self.host_header 812 if authority: 813 return url.parse_authority(authority, check=False)[0] 814 else: 815 return self.host
Read-only: Like Request.host
, but using Request.host_header
header as an additional (preferred) data source.
This is useful in transparent mode where Request.host
is only an IP address.
Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.
817 @property 818 def pretty_url(self) -> str: 819 """ 820 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 821 """ 822 if self.first_line_format == "authority": 823 return self.authority 824 825 host_header = self.host_header 826 if not host_header: 827 return self.url 828 829 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 830 pretty_port = pretty_port or url.default_port(self.scheme) or 443 831 832 return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
Read-only: Like Request.url
, but using Request.pretty_host
instead of Request.host
.
843 @property 844 def query(self) -> multidict.MultiDictView[str, str]: 845 """ 846 The request query as a mutable mapping view on the request's path. 847 For the most part, this behaves like a dictionary. 848 Modifications to the MultiDictView update `Request.path`, and vice versa. 849 """ 850 return multidict.MultiDictView(self._get_query, self._set_query)
The request query as a mutable mapping view on the request's path.
For the most part, this behaves like a dictionary.
Modifications to the MultiDictView update Request.path
, and vice versa.
876 @property 877 def path_components(self) -> tuple[str, ...]: 878 """ 879 The URL's path components as a tuple of strings. 880 Components are unquoted. 881 """ 882 path = urllib.parse.urlparse(self.url).path 883 # This needs to be a tuple so that it's immutable. 884 # Otherwise, this would fail silently: 885 # request.path_components.append("foo") 886 return tuple(url.unquote(i) for i in path.split("/") if i)
The URL's path components as a tuple of strings. Components are unquoted.
895 def anticache(self) -> None: 896 """ 897 Modifies this request to remove headers that might produce a cached response. 898 """ 899 delheaders = ( 900 "if-modified-since", 901 "if-none-match", 902 ) 903 for i in delheaders: 904 self.headers.pop(i, None)
Modifies this request to remove headers that might produce a cached response.
906 def anticomp(self) -> None: 907 """ 908 Modify the Accept-Encoding header to only accept uncompressed responses. 909 """ 910 self.headers["accept-encoding"] = "identity"
Modify the Accept-Encoding header to only accept uncompressed responses.
912 def constrain_encoding(self) -> None: 913 """ 914 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 915 """ 916 accept_encoding = self.headers.get("accept-encoding") 917 if accept_encoding: 918 self.headers["accept-encoding"] = ", ".join( 919 e 920 for e in {"gzip", "identity", "deflate", "br", "zstd"} 921 if e in accept_encoding 922 )
Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
941 @property 942 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 943 """ 944 The URL-encoded form data. 945 946 If the content-type indicates non-form data or the form could not be parsed, this is set to 947 an empty `MultiDictView`. 948 949 Modifications to the MultiDictView update `Request.content`, and vice versa. 950 """ 951 return multidict.MultiDictView( 952 self._get_urlencoded_form, self._set_urlencoded_form 953 )
The URL-encoded form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
988 @property 989 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 990 """ 991 The multipart form data. 992 993 If the content-type indicates non-form data or the form could not be parsed, this is set to 994 an empty `MultiDictView`. 995 996 Modifications to the MultiDictView update `Request.content`, and vice versa. 997 """ 998 return multidict.MultiDictView( 999 self._get_multipart_form, self._set_multipart_form 1000 )
The multipart form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
1007class Response(Message): 1008 """ 1009 An HTTP response. 1010 """ 1011 1012 data: ResponseData 1013 1014 def __init__( 1015 self, 1016 http_version: bytes, 1017 status_code: int, 1018 reason: bytes, 1019 headers: Headers | tuple[tuple[bytes, bytes], ...], 1020 content: bytes | None, 1021 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1022 timestamp_start: float, 1023 timestamp_end: float | None, 1024 ): 1025 # auto-convert invalid types to retain compatibility with older code. 1026 if isinstance(http_version, str): 1027 http_version = http_version.encode("ascii", "strict") 1028 if isinstance(reason, str): 1029 reason = reason.encode("ascii", "strict") 1030 1031 if isinstance(content, str): 1032 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1033 if not isinstance(headers, Headers): 1034 headers = Headers(headers) 1035 if trailers is not None and not isinstance(trailers, Headers): 1036 trailers = Headers(trailers) 1037 1038 self.data = ResponseData( 1039 http_version=http_version, 1040 status_code=status_code, 1041 reason=reason, 1042 headers=headers, 1043 content=content, 1044 trailers=trailers, 1045 timestamp_start=timestamp_start, 1046 timestamp_end=timestamp_end, 1047 ) 1048 1049 def __repr__(self) -> str: 1050 if self.raw_content: 1051 ct = self.headers.get("content-type", "unknown content type") 1052 size = human.pretty_size(len(self.raw_content)) 1053 details = f"{ct}, {size}" 1054 else: 1055 details = "no content" 1056 return f"Response({self.status_code}, {details})" 1057 1058 @classmethod 1059 def make( 1060 cls, 1061 status_code: int = 200, 1062 content: bytes | str = b"", 1063 headers: ( 1064 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1065 ) = (), 1066 ) -> "Response": 1067 """ 1068 Simplified API for creating response objects. 1069 """ 1070 if isinstance(headers, Headers): 1071 headers = headers 1072 elif isinstance(headers, dict): 1073 headers = Headers( 1074 ( 1075 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1076 always_bytes(v, "utf-8", "surrogateescape"), 1077 ) 1078 for k, v in headers.items() 1079 ) 1080 elif isinstance(headers, Iterable): 1081 headers = Headers(headers) # type: ignore 1082 else: 1083 raise TypeError( 1084 "Expected headers to be an iterable or dict, but is {}.".format( 1085 type(headers).__name__ 1086 ) 1087 ) 1088 1089 resp = cls( 1090 b"HTTP/1.1", 1091 status_code, 1092 status_codes.RESPONSES.get(status_code, "").encode(), 1093 headers, 1094 None, 1095 None, 1096 time.time(), 1097 time.time(), 1098 ) 1099 1100 # Assign this manually to update the content-length header. 1101 if isinstance(content, bytes): 1102 resp.content = content 1103 elif isinstance(content, str): 1104 resp.text = content 1105 else: 1106 raise TypeError( 1107 f"Expected content to be str or bytes, but is {type(content).__name__}." 1108 ) 1109 1110 return resp 1111 1112 @property 1113 def status_code(self) -> int: 1114 """ 1115 HTTP Status Code, e.g. ``200``. 1116 """ 1117 return self.data.status_code 1118 1119 @status_code.setter 1120 def status_code(self, status_code: int) -> None: 1121 self.data.status_code = status_code 1122 1123 @property 1124 def reason(self) -> str: 1125 """ 1126 HTTP reason phrase, for example "Not Found". 1127 1128 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1129 """ 1130 # Encoding: http://stackoverflow.com/a/16674906/934719 1131 return self.data.reason.decode("ISO-8859-1") 1132 1133 @reason.setter 1134 def reason(self, reason: str | bytes) -> None: 1135 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1136 1137 def _get_cookies(self): 1138 h = self.headers.get_all("set-cookie") 1139 all_cookies = cookies.parse_set_cookie_headers(h) 1140 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1141 1142 def _set_cookies(self, value): 1143 cookie_headers = [] 1144 for k, v in value: 1145 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1146 cookie_headers.append(header) 1147 self.headers.set_all("set-cookie", cookie_headers) 1148 1149 @property 1150 def cookies( 1151 self, 1152 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1153 """ 1154 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1155 name strings, and values are `(cookie value, attributes)` tuples. Within 1156 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1157 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1158 1159 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1160 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1161 """ 1162 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1163 1164 @cookies.setter 1165 def cookies(self, value): 1166 self._set_cookies(value) 1167 1168 def refresh(self, now=None): 1169 """ 1170 This fairly complex and heuristic function refreshes a server 1171 response for replay. 1172 1173 - It adjusts date, expires, and last-modified headers. 1174 - It adjusts cookie expiration. 1175 """ 1176 if not now: 1177 now = time.time() 1178 delta = now - self.timestamp_start 1179 refresh_headers = [ 1180 "date", 1181 "expires", 1182 "last-modified", 1183 ] 1184 for i in refresh_headers: 1185 if i in self.headers: 1186 d = parsedate_tz(self.headers[i]) 1187 if d: 1188 new = mktime_tz(d) + delta 1189 try: 1190 self.headers[i] = formatdate(new, usegmt=True) 1191 except OSError: # pragma: no cover 1192 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1193 c = [] 1194 for set_cookie_header in self.headers.get_all("set-cookie"): 1195 try: 1196 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1197 except ValueError: 1198 refreshed = set_cookie_header 1199 c.append(refreshed) 1200 if c: 1201 self.headers.set_all("set-cookie", c)
An HTTP response.
1014 def __init__( 1015 self, 1016 http_version: bytes, 1017 status_code: int, 1018 reason: bytes, 1019 headers: Headers | tuple[tuple[bytes, bytes], ...], 1020 content: bytes | None, 1021 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1022 timestamp_start: float, 1023 timestamp_end: float | None, 1024 ): 1025 # auto-convert invalid types to retain compatibility with older code. 1026 if isinstance(http_version, str): 1027 http_version = http_version.encode("ascii", "strict") 1028 if isinstance(reason, str): 1029 reason = reason.encode("ascii", "strict") 1030 1031 if isinstance(content, str): 1032 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1033 if not isinstance(headers, Headers): 1034 headers = Headers(headers) 1035 if trailers is not None and not isinstance(trailers, Headers): 1036 trailers = Headers(trailers) 1037 1038 self.data = ResponseData( 1039 http_version=http_version, 1040 status_code=status_code, 1041 reason=reason, 1042 headers=headers, 1043 content=content, 1044 trailers=trailers, 1045 timestamp_start=timestamp_start, 1046 timestamp_end=timestamp_end, 1047 )
1058 @classmethod 1059 def make( 1060 cls, 1061 status_code: int = 200, 1062 content: bytes | str = b"", 1063 headers: ( 1064 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1065 ) = (), 1066 ) -> "Response": 1067 """ 1068 Simplified API for creating response objects. 1069 """ 1070 if isinstance(headers, Headers): 1071 headers = headers 1072 elif isinstance(headers, dict): 1073 headers = Headers( 1074 ( 1075 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1076 always_bytes(v, "utf-8", "surrogateescape"), 1077 ) 1078 for k, v in headers.items() 1079 ) 1080 elif isinstance(headers, Iterable): 1081 headers = Headers(headers) # type: ignore 1082 else: 1083 raise TypeError( 1084 "Expected headers to be an iterable or dict, but is {}.".format( 1085 type(headers).__name__ 1086 ) 1087 ) 1088 1089 resp = cls( 1090 b"HTTP/1.1", 1091 status_code, 1092 status_codes.RESPONSES.get(status_code, "").encode(), 1093 headers, 1094 None, 1095 None, 1096 time.time(), 1097 time.time(), 1098 ) 1099 1100 # Assign this manually to update the content-length header. 1101 if isinstance(content, bytes): 1102 resp.content = content 1103 elif isinstance(content, str): 1104 resp.text = content 1105 else: 1106 raise TypeError( 1107 f"Expected content to be str or bytes, but is {type(content).__name__}." 1108 ) 1109 1110 return resp
Simplified API for creating response objects.
1112 @property 1113 def status_code(self) -> int: 1114 """ 1115 HTTP Status Code, e.g. ``200``. 1116 """ 1117 return self.data.status_code
HTTP Status Code, e.g. 200
.
1123 @property 1124 def reason(self) -> str: 1125 """ 1126 HTTP reason phrase, for example "Not Found". 1127 1128 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1129 """ 1130 # Encoding: http://stackoverflow.com/a/16674906/934719 1131 return self.data.reason.decode("ISO-8859-1")
HTTP reason phrase, for example "Not Found".
HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1168 def refresh(self, now=None): 1169 """ 1170 This fairly complex and heuristic function refreshes a server 1171 response for replay. 1172 1173 - It adjusts date, expires, and last-modified headers. 1174 - It adjusts cookie expiration. 1175 """ 1176 if not now: 1177 now = time.time() 1178 delta = now - self.timestamp_start 1179 refresh_headers = [ 1180 "date", 1181 "expires", 1182 "last-modified", 1183 ] 1184 for i in refresh_headers: 1185 if i in self.headers: 1186 d = parsedate_tz(self.headers[i]) 1187 if d: 1188 new = mktime_tz(d) + delta 1189 try: 1190 self.headers[i] = formatdate(new, usegmt=True) 1191 except OSError: # pragma: no cover 1192 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1193 c = [] 1194 for set_cookie_header in self.headers.get_all("set-cookie"): 1195 try: 1196 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1197 except ValueError: 1198 refreshed = set_cookie_header 1199 c.append(refreshed) 1200 if c: 1201 self.headers.set_all("set-cookie", c)
This fairly complex and heuristic function refreshes a server response for replay.
- It adjusts date, expires, and last-modified headers.
- It adjusts cookie expiration.
50class Headers(multidict.MultiDict): # type: ignore 51 """ 52 Header class which allows both convenient access to individual headers as well as 53 direct access to the underlying raw data. Provides a full dictionary interface. 54 55 Create headers with keyword arguments: 56 >>> h = Headers(host="example.com", content_type="application/xml") 57 58 Headers mostly behave like a normal dict: 59 >>> h["Host"] 60 "example.com" 61 62 Headers are case insensitive: 63 >>> h["host"] 64 "example.com" 65 66 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 67 >>> h = Headers([ 68 (b"Host",b"example.com"), 69 (b"Accept",b"text/html"), 70 (b"accept",b"application/xml") 71 ]) 72 73 Multiple headers are folded into a single header as per RFC 7230: 74 >>> h["Accept"] 75 "text/html, application/xml" 76 77 Setting a header removes all existing headers with the same name: 78 >>> h["Accept"] = "application/text" 79 >>> h["Accept"] 80 "application/text" 81 82 `bytes(h)` returns an HTTP/1 header block: 83 >>> print(bytes(h)) 84 Host: example.com 85 Accept: application/text 86 87 For full control, the raw header fields can be accessed: 88 >>> h.fields 89 90 Caveats: 91 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 92 """ 93 94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 ) 119 120 fields: tuple[tuple[bytes, bytes], ...] 121 122 @staticmethod 123 def _reduce_values(values) -> str: 124 # Headers can be folded 125 return ", ".join(values) 126 127 @staticmethod 128 def _kconv(key) -> str: 129 # Headers are case-insensitive 130 return key.lower() 131 132 def __bytes__(self) -> bytes: 133 if self.fields: 134 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 135 else: 136 return b"" 137 138 def __delitem__(self, key: str | bytes) -> None: 139 key = _always_bytes(key) 140 super().__delitem__(key) 141 142 def __iter__(self) -> Iterator[str]: 143 for x in super().__iter__(): 144 yield _native(x) 145 146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)] 158 159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values) 167 168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value) 172 173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.
Create headers with keyword arguments:
>>> h = Headers(host="example.com", content_type="application/xml")
Headers mostly behave like a normal dict:
>>> h["Host"]
"example.com"
Headers are case insensitive:
>>> h["host"]
"example.com"
Headers can also be created from a list of raw (header_name, header_value) byte tuples:
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
Multiple headers are folded into a single header as per RFC 7230:
>>> h["Accept"]
"text/html, application/xml"
Setting a header removes all existing headers with the same name:
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
bytes(h)
returns an HTTP/1 header block:
>>> print(bytes(h))
Host: example.com
Accept: application/text
For full control, the raw header fields can be accessed:
>>> h.fields
Caveats:
- For use with the "Set-Cookie" and "Cookie" headers, either use
Response.cookies
or seeHeaders.get_all
.
94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 )
Args:
- fields: (optional) list of
(name, value)
header byte tuples, e.g.[(b"Host", b"example.com")]
. All names and values must be bytes. - **headers: Additional headers to set. Will overwrite existing values from
fields
. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.
If **headers
contains multiple keys that have equal .lower()
representations,
the behavior is undefined.
146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)]
Like Headers.get
, but does not fold multiple headers into a single one.
This is useful for Set-Cookie and Cookie headers, which do not support folding.
See also:
159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values)
Explicitly set multiple headers for the given key.
See Headers.get_all
.
168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value)
Insert an additional value for the given key at the specified position.
173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Get all (key, value) tuples.
If multi
is True, all (key, value)
pairs will be returned.
If False, only one tuple per key is returned.