mitmproxy.http
1import binascii 2import json 3import os 4import time 5import urllib.parse 6import warnings 7from collections.abc import Callable 8from collections.abc import Iterable 9from collections.abc import Iterator 10from collections.abc import Mapping 11from collections.abc import Sequence 12from dataclasses import dataclass 13from dataclasses import fields 14from email.utils import formatdate 15from email.utils import mktime_tz 16from email.utils import parsedate_tz 17from typing import Any 18from typing import cast 19 20from mitmproxy import flow 21from mitmproxy.coretypes import multidict 22from mitmproxy.coretypes import serializable 23from mitmproxy.net import encoding 24from mitmproxy.net.http import cookies 25from mitmproxy.net.http import multipart 26from mitmproxy.net.http import status_codes 27from mitmproxy.net.http import url 28from mitmproxy.net.http.headers import assemble_content_type 29from mitmproxy.net.http.headers import infer_content_encoding 30from mitmproxy.net.http.headers import parse_content_type 31from mitmproxy.utils import human 32from mitmproxy.utils import strutils 33from mitmproxy.utils import typecheck 34from mitmproxy.utils.strutils import always_bytes 35from mitmproxy.utils.strutils import always_str 36from mitmproxy.websocket import WebSocketData 37 38 39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. 40def _native(x: bytes) -> str: 41 return x.decode("utf-8", "surrogateescape") 42 43 44def _always_bytes(x: str | bytes) -> bytes: 45 return strutils.always_bytes(x, "utf-8", "surrogateescape") 46 47 48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types. 49class Headers(multidict.MultiDict): # type: ignore 50 """ 51 Header class which allows both convenient access to individual headers as well as 52 direct access to the underlying raw data. Provides a full dictionary interface. 53 54 Create headers with keyword arguments: 55 >>> h = Headers(host="example.com", content_type="application/xml") 56 57 Headers mostly behave like a normal dict: 58 >>> h["Host"] 59 "example.com" 60 61 Headers are case insensitive: 62 >>> h["host"] 63 "example.com" 64 65 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 66 >>> h = Headers([ 67 (b"Host",b"example.com"), 68 (b"Accept",b"text/html"), 69 (b"accept",b"application/xml") 70 ]) 71 72 Multiple headers are folded into a single header as per RFC 7230: 73 >>> h["Accept"] 74 "text/html, application/xml" 75 76 Setting a header removes all existing headers with the same name: 77 >>> h["Accept"] = "application/text" 78 >>> h["Accept"] 79 "application/text" 80 81 `bytes(h)` returns an HTTP/1 header block: 82 >>> print(bytes(h)) 83 Host: example.com 84 Accept: application/text 85 86 For full control, the raw header fields can be accessed: 87 >>> h.fields 88 89 Caveats: 90 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 91 """ 92 93 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 94 """ 95 *Args:* 96 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 97 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 98 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 99 For convenience, underscores in header names will be transformed to dashes - 100 this behaviour does not extend to other methods. 101 102 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 103 the behavior is undefined. 104 """ 105 super().__init__(fields) 106 107 for key, value in self.fields: 108 if not isinstance(key, bytes) or not isinstance(value, bytes): 109 raise TypeError("Header fields must be bytes.") 110 111 # content_type -> content-type 112 self.update( 113 { 114 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 115 for name, value in headers.items() 116 } 117 ) 118 119 fields: tuple[tuple[bytes, bytes], ...] 120 121 @staticmethod 122 def _reduce_values(values) -> str: 123 # Headers can be folded 124 return ", ".join(values) 125 126 @staticmethod 127 def _kconv(key) -> str: 128 # Headers are case-insensitive 129 return key.lower() 130 131 def __bytes__(self) -> bytes: 132 if self.fields: 133 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 134 else: 135 return b"" 136 137 def __delitem__(self, key: str | bytes) -> None: 138 key = _always_bytes(key) 139 super().__delitem__(key) 140 141 def __iter__(self) -> Iterator[str]: 142 for x in super().__iter__(): 143 yield _native(x) 144 145 def get_all(self, name: str | bytes) -> list[str]: 146 """ 147 Like `Headers.get`, but does not fold multiple headers into a single one. 148 This is useful for Set-Cookie and Cookie headers, which do not support folding. 149 150 *See also:* 151 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 152 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 153 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 154 """ 155 name = _always_bytes(name) 156 return [_native(x) for x in super().get_all(name)] 157 158 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 159 """ 160 Explicitly set multiple headers for the given key. 161 See `Headers.get_all`. 162 """ 163 name = _always_bytes(name) 164 values = [_always_bytes(x) for x in values] 165 return super().set_all(name, values) 166 167 def insert(self, index: int, key: str | bytes, value: str | bytes): 168 key = _always_bytes(key) 169 value = _always_bytes(value) 170 super().insert(index, key, value) 171 172 def items(self, multi=False): 173 if multi: 174 return ((_native(k), _native(v)) for k, v in self.fields) 175 else: 176 return super().items() 177 178 179@dataclass 180class MessageData(serializable.Serializable): 181 http_version: bytes 182 headers: Headers 183 content: bytes | None 184 trailers: Headers | None 185 timestamp_start: float 186 timestamp_end: float | None 187 188 # noinspection PyUnreachableCode 189 if __debug__: 190 191 def __post_init__(self): 192 for field in fields(self): 193 val = getattr(self, field.name) 194 typecheck.check_option_type(field.name, val, field.type) 195 196 def set_state(self, state): 197 for k, v in state.items(): 198 if k in ("headers", "trailers") and v is not None: 199 v = Headers.from_state(v) 200 setattr(self, k, v) 201 202 def get_state(self): 203 state = vars(self).copy() 204 state["headers"] = state["headers"].get_state() 205 if state["trailers"] is not None: 206 state["trailers"] = state["trailers"].get_state() 207 return state 208 209 @classmethod 210 def from_state(cls, state): 211 state["headers"] = Headers.from_state(state["headers"]) 212 if state["trailers"] is not None: 213 state["trailers"] = Headers.from_state(state["trailers"]) 214 return cls(**state) 215 216 217@dataclass 218class RequestData(MessageData): 219 host: str 220 port: int 221 method: bytes 222 scheme: bytes 223 authority: bytes 224 path: bytes 225 226 227@dataclass 228class ResponseData(MessageData): 229 status_code: int 230 reason: bytes 231 232 233class Message(serializable.Serializable): 234 """Base class for `Request` and `Response`.""" 235 236 @classmethod 237 def from_state(cls, state): 238 return cls(**state) 239 240 def get_state(self): 241 return self.data.get_state() 242 243 def set_state(self, state): 244 self.data.set_state(state) 245 246 data: MessageData 247 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 248 """ 249 This attribute controls if the message body should be streamed. 250 251 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 252 This makes it possible to perform string replacements on the entire body. 253 If `True`, the message body will not be buffered on the proxy 254 but immediately forwarded instead. 255 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 256 Please note that packet boundaries generally should not be relied upon. 257 258 This attribute must be set in the `requestheaders` or `responseheaders` hook. 259 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 260 """ 261 262 @property 263 def http_version(self) -> str: 264 """ 265 HTTP version string, for example `HTTP/1.1`. 266 """ 267 return self.data.http_version.decode("utf-8", "surrogateescape") 268 269 @http_version.setter 270 def http_version(self, http_version: str | bytes) -> None: 271 self.data.http_version = strutils.always_bytes( 272 http_version, "utf-8", "surrogateescape" 273 ) 274 275 @property 276 def is_http10(self) -> bool: 277 return self.data.http_version == b"HTTP/1.0" 278 279 @property 280 def is_http11(self) -> bool: 281 return self.data.http_version == b"HTTP/1.1" 282 283 @property 284 def is_http2(self) -> bool: 285 return self.data.http_version == b"HTTP/2.0" 286 287 @property 288 def is_http3(self) -> bool: 289 return self.data.http_version == b"HTTP/3" 290 291 @property 292 def headers(self) -> Headers: 293 """ 294 The HTTP headers. 295 """ 296 return self.data.headers 297 298 @headers.setter 299 def headers(self, h: Headers) -> None: 300 self.data.headers = h 301 302 @property 303 def trailers(self) -> Headers | None: 304 """ 305 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 306 """ 307 return self.data.trailers 308 309 @trailers.setter 310 def trailers(self, h: Headers | None) -> None: 311 self.data.trailers = h 312 313 @property 314 def raw_content(self) -> bytes | None: 315 """ 316 The raw (potentially compressed) HTTP message body. 317 318 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 319 320 *See also:* `Message.content`, `Message.text` 321 """ 322 return self.data.content 323 324 @raw_content.setter 325 def raw_content(self, content: bytes | None) -> None: 326 self.data.content = content 327 328 @property 329 def content(self) -> bytes | None: 330 """ 331 The uncompressed HTTP message body as bytes. 332 333 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 334 335 *See also:* `Message.raw_content`, `Message.text` 336 """ 337 return self.get_content() 338 339 @content.setter 340 def content(self, value: bytes | None) -> None: 341 self.set_content(value) 342 343 @property 344 def text(self) -> str | None: 345 """ 346 The uncompressed and decoded HTTP message body as text. 347 348 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 349 350 *See also:* `Message.raw_content`, `Message.content` 351 """ 352 return self.get_text() 353 354 @text.setter 355 def text(self, value: str | None) -> None: 356 self.set_text(value) 357 358 def set_content(self, value: bytes | None) -> None: 359 if value is None: 360 self.raw_content = None 361 return 362 if not isinstance(value, bytes): 363 raise TypeError( 364 f"Message content must be bytes, not {type(value).__name__}. " 365 "Please use .text if you want to assign a str." 366 ) 367 ce = self.headers.get("content-encoding") 368 try: 369 self.raw_content = encoding.encode(value, ce or "identity") 370 except ValueError: 371 # So we have an invalid content-encoding? 372 # Let's remove it! 373 del self.headers["content-encoding"] 374 self.raw_content = value 375 376 if "transfer-encoding" in self.headers: 377 # https://httpwg.org/specs/rfc7230.html#header.content-length 378 # don't set content-length if a transfer-encoding is provided 379 pass 380 else: 381 self.headers["content-length"] = str(len(self.raw_content)) 382 383 def get_content(self, strict: bool = True) -> bytes | None: 384 """ 385 Similar to `Message.content`, but does not raise if `strict` is `False`. 386 Instead, the compressed message body is returned as-is. 387 """ 388 if self.raw_content is None: 389 return None 390 ce = self.headers.get("content-encoding") 391 if ce: 392 try: 393 content = encoding.decode(self.raw_content, ce) 394 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 395 if isinstance(content, str): 396 raise ValueError(f"Invalid Content-Encoding: {ce}") 397 return content 398 except ValueError: 399 if strict: 400 raise 401 return self.raw_content 402 else: 403 return self.raw_content 404 405 def set_text(self, text: str | None) -> None: 406 if text is None: 407 self.content = None 408 return 409 enc = infer_content_encoding(self.headers.get("content-type", "")) 410 411 try: 412 self.content = cast(bytes, encoding.encode(text, enc)) 413 except ValueError: 414 # Fall back to UTF-8 and update the content-type header. 415 ct = parse_content_type(self.headers.get("content-type", "")) or ( 416 "text", 417 "plain", 418 {}, 419 ) 420 ct[2]["charset"] = "utf-8" 421 self.headers["content-type"] = assemble_content_type(*ct) 422 enc = "utf8" 423 self.content = text.encode(enc, "surrogateescape") 424 425 def get_text(self, strict: bool = True) -> str | None: 426 """ 427 Similar to `Message.text`, but does not raise if `strict` is `False`. 428 Instead, the message body is returned as surrogate-escaped UTF-8. 429 """ 430 content = self.get_content(strict) 431 if content is None: 432 return None 433 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 434 try: 435 return cast(str, encoding.decode(content, enc)) 436 except ValueError: 437 if strict: 438 raise 439 return content.decode("utf8", "surrogateescape") 440 441 @property 442 def timestamp_start(self) -> float: 443 """ 444 *Timestamp:* Headers received. 445 """ 446 return self.data.timestamp_start 447 448 @timestamp_start.setter 449 def timestamp_start(self, timestamp_start: float) -> None: 450 self.data.timestamp_start = timestamp_start 451 452 @property 453 def timestamp_end(self) -> float | None: 454 """ 455 *Timestamp:* Last byte received. 456 """ 457 return self.data.timestamp_end 458 459 @timestamp_end.setter 460 def timestamp_end(self, timestamp_end: float | None): 461 self.data.timestamp_end = timestamp_end 462 463 def decode(self, strict: bool = True) -> None: 464 """ 465 Decodes body based on the current Content-Encoding header, then 466 removes the header. If there is no Content-Encoding header, no 467 action is taken. 468 469 *Raises:* 470 - `ValueError`, when the content-encoding is invalid and strict is True. 471 """ 472 decoded = self.get_content(strict) 473 self.headers.pop("content-encoding", None) 474 self.content = decoded 475 476 def encode(self, encoding: str) -> None: 477 """ 478 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 479 Any existing content-encodings are overwritten, the content is not decoded beforehand. 480 481 *Raises:* 482 - `ValueError`, when the specified content-encoding is invalid. 483 """ 484 self.headers["content-encoding"] = encoding 485 self.content = self.raw_content 486 if "content-encoding" not in self.headers: 487 raise ValueError(f"Invalid content encoding {repr(encoding)}") 488 489 def json(self, **kwargs: Any) -> Any: 490 """ 491 Returns the JSON encoded content of the response, if any. 492 `**kwargs` are optional arguments that will be 493 passed to `json.loads()`. 494 495 Will raise if the content can not be decoded and then parsed as JSON. 496 497 *Raises:* 498 - `json.decoder.JSONDecodeError` if content is not valid JSON. 499 - `TypeError` if the content is not available, for example because the response 500 has been streamed. 501 """ 502 content = self.get_content(strict=False) 503 if content is None: 504 raise TypeError("Message content is not available.") 505 else: 506 return json.loads(content, **kwargs) 507 508 509class Request(Message): 510 """ 511 An HTTP request. 512 """ 513 514 data: RequestData 515 516 def __init__( 517 self, 518 host: str, 519 port: int, 520 method: bytes, 521 scheme: bytes, 522 authority: bytes, 523 path: bytes, 524 http_version: bytes, 525 headers: Headers | tuple[tuple[bytes, bytes], ...], 526 content: bytes | None, 527 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 528 timestamp_start: float, 529 timestamp_end: float | None, 530 ): 531 # auto-convert invalid types to retain compatibility with older code. 532 if isinstance(host, bytes): 533 host = host.decode("idna", "strict") 534 if isinstance(method, str): 535 method = method.encode("ascii", "strict") 536 if isinstance(scheme, str): 537 scheme = scheme.encode("ascii", "strict") 538 if isinstance(authority, str): 539 authority = authority.encode("ascii", "strict") 540 if isinstance(path, str): 541 path = path.encode("ascii", "strict") 542 if isinstance(http_version, str): 543 http_version = http_version.encode("ascii", "strict") 544 545 if isinstance(content, str): 546 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 547 if not isinstance(headers, Headers): 548 headers = Headers(headers) 549 if trailers is not None and not isinstance(trailers, Headers): 550 trailers = Headers(trailers) 551 552 self.data = RequestData( 553 host=host, 554 port=port, 555 method=method, 556 scheme=scheme, 557 authority=authority, 558 path=path, 559 http_version=http_version, 560 headers=headers, 561 content=content, 562 trailers=trailers, 563 timestamp_start=timestamp_start, 564 timestamp_end=timestamp_end, 565 ) 566 567 def __repr__(self) -> str: 568 if self.host and self.port: 569 hostport = f"{self.host}:{self.port}" 570 else: 571 hostport = "" 572 path = self.path or "" 573 return f"Request({self.method} {hostport}{path})" 574 575 @classmethod 576 def make( 577 cls, 578 method: str, 579 url: str, 580 content: bytes | str = "", 581 headers: ( 582 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 583 ) = (), 584 ) -> "Request": 585 """ 586 Simplified API for creating request objects. 587 """ 588 # Headers can be list or dict, we differentiate here. 589 if isinstance(headers, Headers): 590 pass 591 elif isinstance(headers, dict): 592 headers = Headers( 593 ( 594 always_bytes(k, "utf-8", "surrogateescape"), 595 always_bytes(v, "utf-8", "surrogateescape"), 596 ) 597 for k, v in headers.items() 598 ) 599 elif isinstance(headers, Iterable): 600 headers = Headers(headers) # type: ignore 601 else: 602 raise TypeError( 603 "Expected headers to be an iterable or dict, but is {}.".format( 604 type(headers).__name__ 605 ) 606 ) 607 608 req = cls( 609 "", 610 0, 611 method.encode("utf-8", "surrogateescape"), 612 b"", 613 b"", 614 b"", 615 b"HTTP/1.1", 616 headers, 617 b"", 618 None, 619 time.time(), 620 time.time(), 621 ) 622 623 req.url = url 624 # Assign this manually to update the content-length header. 625 if isinstance(content, bytes): 626 req.content = content 627 elif isinstance(content, str): 628 req.text = content 629 else: 630 raise TypeError( 631 f"Expected content to be str or bytes, but is {type(content).__name__}." 632 ) 633 634 return req 635 636 @property 637 def first_line_format(self) -> str: 638 """ 639 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 640 641 origin-form and asterisk-form are subsumed as "relative". 642 """ 643 if self.method == "CONNECT": 644 return "authority" 645 elif self.authority: 646 return "absolute" 647 else: 648 return "relative" 649 650 @property 651 def method(self) -> str: 652 """ 653 HTTP request method, e.g. "GET". 654 """ 655 return self.data.method.decode("utf-8", "surrogateescape").upper() 656 657 @method.setter 658 def method(self, val: str | bytes) -> None: 659 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 660 661 @property 662 def scheme(self) -> str: 663 """ 664 HTTP request scheme, which should be "http" or "https". 665 """ 666 return self.data.scheme.decode("utf-8", "surrogateescape") 667 668 @scheme.setter 669 def scheme(self, val: str | bytes) -> None: 670 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 671 672 @property 673 def authority(self) -> str: 674 """ 675 HTTP request authority. 676 677 For HTTP/1, this is the authority portion of the request target 678 (in either absolute-form or authority-form). 679 For origin-form and asterisk-form requests, this property is set to an empty string. 680 681 For HTTP/2, this is the :authority pseudo header. 682 683 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 684 """ 685 try: 686 return self.data.authority.decode("idna") 687 except UnicodeError: 688 return self.data.authority.decode("utf8", "surrogateescape") 689 690 @authority.setter 691 def authority(self, val: str | bytes) -> None: 692 if isinstance(val, str): 693 try: 694 val = val.encode("idna", "strict") 695 except UnicodeError: 696 val = val.encode("utf8", "surrogateescape") # type: ignore 697 self.data.authority = val 698 699 @property 700 def host(self) -> str: 701 """ 702 Target server for this request. This may be parsed from the raw request 703 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 704 or inferred from the proxy mode (e.g. an IP in transparent mode). 705 706 Setting the host attribute also updates the host header and authority information, if present. 707 708 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 709 """ 710 return self.data.host 711 712 @host.setter 713 def host(self, val: str | bytes) -> None: 714 self.data.host = always_str(val, "idna", "strict") 715 self._update_host_and_authority() 716 717 @property 718 def host_header(self) -> str | None: 719 """ 720 The request's host/authority header. 721 722 This property maps to either ``request.headers["Host"]`` or 723 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 724 725 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 726 """ 727 if self.is_http2 or self.is_http3: 728 return self.authority or self.data.headers.get("Host", None) 729 else: 730 return self.data.headers.get("Host", None) 731 732 @host_header.setter 733 def host_header(self, val: None | str | bytes) -> None: 734 if val is None: 735 if self.is_http2 or self.is_http3: 736 self.data.authority = b"" 737 self.headers.pop("Host", None) 738 else: 739 if self.is_http2 or self.is_http3: 740 self.authority = val # type: ignore 741 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 742 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 743 self.headers["Host"] = val 744 745 @property 746 def port(self) -> int: 747 """ 748 Target port. 749 """ 750 return self.data.port 751 752 @port.setter 753 def port(self, port: int) -> None: 754 if not isinstance(port, int): 755 raise ValueError(f"Port must be an integer, not {port!r}.") 756 757 self.data.port = port 758 self._update_host_and_authority() 759 760 def _update_host_and_authority(self) -> None: 761 val = url.hostport(self.scheme, self.host, self.port) 762 763 # Update host header 764 if "Host" in self.data.headers: 765 self.data.headers["Host"] = val 766 # Update authority 767 if self.data.authority: 768 self.authority = val 769 770 @property 771 def path(self) -> str: 772 """ 773 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 774 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 775 776 This attribute includes both path and query parts of the target URI 777 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 778 """ 779 return self.data.path.decode("utf-8", "surrogateescape") 780 781 @path.setter 782 def path(self, val: str | bytes) -> None: 783 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 784 785 @property 786 def url(self) -> str: 787 """ 788 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 789 790 Settings this property updates these attributes as well. 791 """ 792 if self.first_line_format == "authority": 793 return f"{self.host}:{self.port}" 794 path = self.path if self.path != "*" else "" 795 return url.unparse(self.scheme, self.host, self.port, path) 796 797 @url.setter 798 def url(self, val: str | bytes) -> None: 799 val = always_str(val, "utf-8", "surrogateescape") 800 self.scheme, self.host, self.port, self.path = url.parse(val) # type: ignore 801 802 @property 803 def pretty_host(self) -> str: 804 """ 805 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 806 This is useful in transparent mode where `Request.host` is only an IP address. 807 808 *Warning:* When working in adversarial environments, this may not reflect the actual destination 809 as the Host header could be spoofed. 810 """ 811 authority = self.host_header 812 if authority: 813 return url.parse_authority(authority, check=False)[0] 814 else: 815 return self.host 816 817 @property 818 def pretty_url(self) -> str: 819 """ 820 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 821 """ 822 if self.first_line_format == "authority": 823 return self.authority 824 825 host_header = self.host_header 826 if not host_header: 827 return self.url 828 829 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 830 pretty_port = pretty_port or url.default_port(self.scheme) or 443 831 path = self.path if self.path != "*" else "" 832 833 return url.unparse(self.scheme, pretty_host, pretty_port, path) 834 835 def _get_query(self): 836 query = urllib.parse.urlparse(self.url).query 837 return tuple(url.decode(query)) 838 839 def _set_query(self, query_data): 840 query = url.encode(query_data) 841 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 842 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 843 844 @property 845 def query(self) -> multidict.MultiDictView[str, str]: 846 """ 847 The request query as a mutable mapping view on the request's path. 848 For the most part, this behaves like a dictionary. 849 Modifications to the MultiDictView update `Request.path`, and vice versa. 850 """ 851 return multidict.MultiDictView(self._get_query, self._set_query) 852 853 @query.setter 854 def query(self, value): 855 self._set_query(value) 856 857 def _get_cookies(self): 858 h = self.headers.get_all("Cookie") 859 return tuple(cookies.parse_cookie_headers(h)) 860 861 def _set_cookies(self, value): 862 self.headers["cookie"] = cookies.format_cookie_header(value) 863 864 @property 865 def cookies(self) -> multidict.MultiDictView[str, str]: 866 """ 867 The request cookies. 868 For the most part, this behaves like a dictionary. 869 Modifications to the MultiDictView update `Request.headers`, and vice versa. 870 """ 871 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 872 873 @cookies.setter 874 def cookies(self, value): 875 self._set_cookies(value) 876 877 @property 878 def path_components(self) -> tuple[str, ...]: 879 """ 880 The URL's path components as a tuple of strings. 881 Components are unquoted. 882 """ 883 path = urllib.parse.urlparse(self.url).path 884 # This needs to be a tuple so that it's immutable. 885 # Otherwise, this would fail silently: 886 # request.path_components.append("foo") 887 return tuple(url.unquote(i) for i in path.split("/") if i) 888 889 @path_components.setter 890 def path_components(self, components: Iterable[str]): 891 components = map(lambda x: url.quote(x, safe=""), components) 892 path = "/" + "/".join(components) 893 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 894 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 895 896 def anticache(self) -> None: 897 """ 898 Modifies this request to remove headers that might produce a cached response. 899 """ 900 delheaders = ( 901 "if-modified-since", 902 "if-none-match", 903 ) 904 for i in delheaders: 905 self.headers.pop(i, None) 906 907 def anticomp(self) -> None: 908 """ 909 Modify the Accept-Encoding header to only accept uncompressed responses. 910 """ 911 self.headers["accept-encoding"] = "identity" 912 913 def constrain_encoding(self) -> None: 914 """ 915 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 916 """ 917 accept_encoding = self.headers.get("accept-encoding") 918 if accept_encoding: 919 self.headers["accept-encoding"] = ", ".join( 920 e 921 for e in {"gzip", "identity", "deflate", "br", "zstd"} 922 if e in accept_encoding 923 ) 924 925 def _get_urlencoded_form(self): 926 is_valid_content_type = ( 927 "application/x-www-form-urlencoded" 928 in self.headers.get("content-type", "").lower() 929 ) 930 if is_valid_content_type: 931 return tuple(url.decode(self.get_text(strict=False))) 932 return () 933 934 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 935 """ 936 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 937 This will overwrite the existing content if there is one. 938 """ 939 self.headers["content-type"] = "application/x-www-form-urlencoded" 940 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 941 942 @property 943 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 944 """ 945 The URL-encoded form data. 946 947 If the content-type indicates non-form data or the form could not be parsed, this is set to 948 an empty `MultiDictView`. 949 950 Modifications to the MultiDictView update `Request.content`, and vice versa. 951 """ 952 return multidict.MultiDictView( 953 self._get_urlencoded_form, self._set_urlencoded_form 954 ) 955 956 @urlencoded_form.setter 957 def urlencoded_form(self, value): 958 self._set_urlencoded_form(value) 959 960 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 961 is_valid_content_type = ( 962 "multipart/form-data" in self.headers.get("content-type", "").lower() 963 ) 964 if is_valid_content_type and self.content is not None: 965 try: 966 return multipart.decode_multipart( 967 self.headers.get("content-type"), self.content 968 ) 969 except ValueError: 970 pass 971 return [] 972 973 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 974 ct = self.headers.get("content-type", "") 975 is_valid_content_type = ct.lower().startswith("multipart/form-data") 976 if not is_valid_content_type: 977 """ 978 Generate a random boundary here. 979 980 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 981 on generating the boundary. 982 """ 983 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 984 self.headers["content-type"] = ct = ( 985 f"multipart/form-data; boundary={boundary}" 986 ) 987 self.content = multipart.encode_multipart(ct, value) 988 989 @property 990 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 991 """ 992 The multipart form data. 993 994 If the content-type indicates non-form data or the form could not be parsed, this is set to 995 an empty `MultiDictView`. 996 997 Modifications to the MultiDictView update `Request.content`, and vice versa. 998 """ 999 return multidict.MultiDictView( 1000 self._get_multipart_form, self._set_multipart_form 1001 ) 1002 1003 @multipart_form.setter 1004 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1005 self._set_multipart_form(value) 1006 1007 1008class Response(Message): 1009 """ 1010 An HTTP response. 1011 """ 1012 1013 data: ResponseData 1014 1015 def __init__( 1016 self, 1017 http_version: bytes, 1018 status_code: int, 1019 reason: bytes, 1020 headers: Headers | tuple[tuple[bytes, bytes], ...], 1021 content: bytes | None, 1022 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1023 timestamp_start: float, 1024 timestamp_end: float | None, 1025 ): 1026 # auto-convert invalid types to retain compatibility with older code. 1027 if isinstance(http_version, str): 1028 http_version = http_version.encode("ascii", "strict") 1029 if isinstance(reason, str): 1030 reason = reason.encode("ascii", "strict") 1031 1032 if isinstance(content, str): 1033 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1034 if not isinstance(headers, Headers): 1035 headers = Headers(headers) 1036 if trailers is not None and not isinstance(trailers, Headers): 1037 trailers = Headers(trailers) 1038 1039 self.data = ResponseData( 1040 http_version=http_version, 1041 status_code=status_code, 1042 reason=reason, 1043 headers=headers, 1044 content=content, 1045 trailers=trailers, 1046 timestamp_start=timestamp_start, 1047 timestamp_end=timestamp_end, 1048 ) 1049 1050 def __repr__(self) -> str: 1051 if self.raw_content: 1052 ct = self.headers.get("content-type", "unknown content type") 1053 size = human.pretty_size(len(self.raw_content)) 1054 details = f"{ct}, {size}" 1055 else: 1056 details = "no content" 1057 return f"Response({self.status_code}, {details})" 1058 1059 @classmethod 1060 def make( 1061 cls, 1062 status_code: int = 200, 1063 content: bytes | str = b"", 1064 headers: ( 1065 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1066 ) = (), 1067 ) -> "Response": 1068 """ 1069 Simplified API for creating response objects. 1070 """ 1071 if isinstance(headers, Headers): 1072 headers = headers 1073 elif isinstance(headers, dict): 1074 headers = Headers( 1075 ( 1076 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1077 always_bytes(v, "utf-8", "surrogateescape"), 1078 ) 1079 for k, v in headers.items() 1080 ) 1081 elif isinstance(headers, Iterable): 1082 headers = Headers(headers) # type: ignore 1083 else: 1084 raise TypeError( 1085 "Expected headers to be an iterable or dict, but is {}.".format( 1086 type(headers).__name__ 1087 ) 1088 ) 1089 1090 resp = cls( 1091 b"HTTP/1.1", 1092 status_code, 1093 status_codes.RESPONSES.get(status_code, "").encode(), 1094 headers, 1095 None, 1096 None, 1097 time.time(), 1098 time.time(), 1099 ) 1100 1101 # Assign this manually to update the content-length header. 1102 if isinstance(content, bytes): 1103 resp.content = content 1104 elif isinstance(content, str): 1105 resp.text = content 1106 else: 1107 raise TypeError( 1108 f"Expected content to be str or bytes, but is {type(content).__name__}." 1109 ) 1110 1111 return resp 1112 1113 @property 1114 def status_code(self) -> int: 1115 """ 1116 HTTP Status Code, e.g. ``200``. 1117 """ 1118 return self.data.status_code 1119 1120 @status_code.setter 1121 def status_code(self, status_code: int) -> None: 1122 self.data.status_code = status_code 1123 1124 @property 1125 def reason(self) -> str: 1126 """ 1127 HTTP reason phrase, for example "Not Found". 1128 1129 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1130 """ 1131 # Encoding: http://stackoverflow.com/a/16674906/934719 1132 return self.data.reason.decode("ISO-8859-1") 1133 1134 @reason.setter 1135 def reason(self, reason: str | bytes) -> None: 1136 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1137 1138 def _get_cookies(self): 1139 h = self.headers.get_all("set-cookie") 1140 all_cookies = cookies.parse_set_cookie_headers(h) 1141 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1142 1143 def _set_cookies(self, value): 1144 cookie_headers = [] 1145 for k, v in value: 1146 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1147 cookie_headers.append(header) 1148 self.headers.set_all("set-cookie", cookie_headers) 1149 1150 @property 1151 def cookies( 1152 self, 1153 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1154 """ 1155 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1156 name strings, and values are `(cookie value, attributes)` tuples. Within 1157 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1158 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1159 1160 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1161 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1162 """ 1163 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1164 1165 @cookies.setter 1166 def cookies(self, value): 1167 self._set_cookies(value) 1168 1169 def refresh(self, now=None): 1170 """ 1171 This fairly complex and heuristic function refreshes a server 1172 response for replay. 1173 1174 - It adjusts date, expires, and last-modified headers. 1175 - It adjusts cookie expiration. 1176 """ 1177 if not now: 1178 now = time.time() 1179 delta = now - self.timestamp_start 1180 refresh_headers = [ 1181 "date", 1182 "expires", 1183 "last-modified", 1184 ] 1185 for i in refresh_headers: 1186 if i in self.headers: 1187 d = parsedate_tz(self.headers[i]) 1188 if d: 1189 new = mktime_tz(d) + delta 1190 try: 1191 self.headers[i] = formatdate(new, usegmt=True) 1192 except OSError: # pragma: no cover 1193 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1194 c = [] 1195 for set_cookie_header in self.headers.get_all("set-cookie"): 1196 try: 1197 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1198 except ValueError: 1199 refreshed = set_cookie_header 1200 c.append(refreshed) 1201 if c: 1202 self.headers.set_all("set-cookie", c) 1203 1204 1205class HTTPFlow(flow.Flow): 1206 """ 1207 An HTTPFlow is a collection of objects representing a single HTTP 1208 transaction. 1209 """ 1210 1211 request: Request 1212 """The client's HTTP request.""" 1213 response: Response | None = None 1214 """The server's HTTP response.""" 1215 error: flow.Error | None = None 1216 """ 1217 A connection or protocol error affecting this flow. 1218 1219 Note that it's possible for a Flow to have both a response and an error 1220 object. This might happen, for instance, when a response was received 1221 from the server, but there was an error sending it back to the client. 1222 """ 1223 1224 websocket: WebSocketData | None = None 1225 """ 1226 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1227 """ 1228 1229 def get_state(self) -> serializable.State: 1230 return { 1231 **super().get_state(), 1232 "request": self.request.get_state(), 1233 "response": self.response.get_state() if self.response else None, 1234 "websocket": self.websocket.get_state() if self.websocket else None, 1235 } 1236 1237 def set_state(self, state: serializable.State) -> None: 1238 self.request = Request.from_state(state.pop("request")) 1239 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1240 self.websocket = ( 1241 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1242 ) 1243 super().set_state(state) 1244 1245 def __repr__(self): 1246 s = "<HTTPFlow" 1247 for a in ( 1248 "request", 1249 "response", 1250 "websocket", 1251 "error", 1252 "client_conn", 1253 "server_conn", 1254 ): 1255 if getattr(self, a, False): 1256 s += f"\r\n {a} = {{flow.{a}}}" 1257 s += ">" 1258 return s.format(flow=self) 1259 1260 @property 1261 def timestamp_start(self) -> float: 1262 """*Read-only:* An alias for `Request.timestamp_start`.""" 1263 return self.request.timestamp_start 1264 1265 @property 1266 def mode(self) -> str: # pragma: no cover 1267 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1268 return getattr(self, "_mode", "regular") 1269 1270 @mode.setter 1271 def mode(self, val: str) -> None: # pragma: no cover 1272 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1273 self._mode = val 1274 1275 def copy(self): 1276 f = super().copy() 1277 if self.request: 1278 f.request = self.request.copy() 1279 if self.response: 1280 f.response = self.response.copy() 1281 return f 1282 1283 1284__all__ = [ 1285 "HTTPFlow", 1286 "Message", 1287 "Request", 1288 "Response", 1289 "Headers", 1290]
1206class HTTPFlow(flow.Flow): 1207 """ 1208 An HTTPFlow is a collection of objects representing a single HTTP 1209 transaction. 1210 """ 1211 1212 request: Request 1213 """The client's HTTP request.""" 1214 response: Response | None = None 1215 """The server's HTTP response.""" 1216 error: flow.Error | None = None 1217 """ 1218 A connection or protocol error affecting this flow. 1219 1220 Note that it's possible for a Flow to have both a response and an error 1221 object. This might happen, for instance, when a response was received 1222 from the server, but there was an error sending it back to the client. 1223 """ 1224 1225 websocket: WebSocketData | None = None 1226 """ 1227 If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. 1228 """ 1229 1230 def get_state(self) -> serializable.State: 1231 return { 1232 **super().get_state(), 1233 "request": self.request.get_state(), 1234 "response": self.response.get_state() if self.response else None, 1235 "websocket": self.websocket.get_state() if self.websocket else None, 1236 } 1237 1238 def set_state(self, state: serializable.State) -> None: 1239 self.request = Request.from_state(state.pop("request")) 1240 self.response = Response.from_state(r) if (r := state.pop("response")) else None 1241 self.websocket = ( 1242 WebSocketData.from_state(w) if (w := state.pop("websocket")) else None 1243 ) 1244 super().set_state(state) 1245 1246 def __repr__(self): 1247 s = "<HTTPFlow" 1248 for a in ( 1249 "request", 1250 "response", 1251 "websocket", 1252 "error", 1253 "client_conn", 1254 "server_conn", 1255 ): 1256 if getattr(self, a, False): 1257 s += f"\r\n {a} = {{flow.{a}}}" 1258 s += ">" 1259 return s.format(flow=self) 1260 1261 @property 1262 def timestamp_start(self) -> float: 1263 """*Read-only:* An alias for `Request.timestamp_start`.""" 1264 return self.request.timestamp_start 1265 1266 @property 1267 def mode(self) -> str: # pragma: no cover 1268 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1269 return getattr(self, "_mode", "regular") 1270 1271 @mode.setter 1272 def mode(self, val: str) -> None: # pragma: no cover 1273 warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2) 1274 self._mode = val 1275 1276 def copy(self): 1277 f = super().copy() 1278 if self.request: 1279 f.request = self.request.copy() 1280 if self.response: 1281 f.response = self.response.copy() 1282 return f
An HTTPFlow is a collection of objects representing a single HTTP transaction.
A connection or protocol error affecting this flow.
Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.
If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1261 @property 1262 def timestamp_start(self) -> float: 1263 """*Read-only:* An alias for `Request.timestamp_start`.""" 1264 return self.request.timestamp_start
Read-only: An alias for Request.timestamp_start
.
234class Message(serializable.Serializable): 235 """Base class for `Request` and `Response`.""" 236 237 @classmethod 238 def from_state(cls, state): 239 return cls(**state) 240 241 def get_state(self): 242 return self.data.get_state() 243 244 def set_state(self, state): 245 self.data.set_state(state) 246 247 data: MessageData 248 stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False 249 """ 250 This attribute controls if the message body should be streamed. 251 252 If `False`, mitmproxy will buffer the entire body before forwarding it to the destination. 253 This makes it possible to perform string replacements on the entire body. 254 If `True`, the message body will not be buffered on the proxy 255 but immediately forwarded instead. 256 Alternatively, a transformation function can be specified, which will be called for each chunk of data. 257 Please note that packet boundaries generally should not be relied upon. 258 259 This attribute must be set in the `requestheaders` or `responseheaders` hook. 260 Setting it in `request` or `response` is already too late, mitmproxy has buffered the message body already. 261 """ 262 263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape") 269 270 @http_version.setter 271 def http_version(self, http_version: str | bytes) -> None: 272 self.data.http_version = strutils.always_bytes( 273 http_version, "utf-8", "surrogateescape" 274 ) 275 276 @property 277 def is_http10(self) -> bool: 278 return self.data.http_version == b"HTTP/1.0" 279 280 @property 281 def is_http11(self) -> bool: 282 return self.data.http_version == b"HTTP/1.1" 283 284 @property 285 def is_http2(self) -> bool: 286 return self.data.http_version == b"HTTP/2.0" 287 288 @property 289 def is_http3(self) -> bool: 290 return self.data.http_version == b"HTTP/3" 291 292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers 298 299 @headers.setter 300 def headers(self, h: Headers) -> None: 301 self.data.headers = h 302 303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers 309 310 @trailers.setter 311 def trailers(self, h: Headers | None) -> None: 312 self.data.trailers = h 313 314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 321 *See also:* `Message.content`, `Message.text` 322 """ 323 return self.data.content 324 325 @raw_content.setter 326 def raw_content(self, content: bytes | None) -> None: 327 self.data.content = content 328 329 @property 330 def content(self) -> bytes | None: 331 """ 332 The uncompressed HTTP message body as bytes. 333 334 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 335 336 *See also:* `Message.raw_content`, `Message.text` 337 """ 338 return self.get_content() 339 340 @content.setter 341 def content(self, value: bytes | None) -> None: 342 self.set_content(value) 343 344 @property 345 def text(self) -> str | None: 346 """ 347 The uncompressed and decoded HTTP message body as text. 348 349 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 350 351 *See also:* `Message.raw_content`, `Message.content` 352 """ 353 return self.get_text() 354 355 @text.setter 356 def text(self, value: str | None) -> None: 357 self.set_text(value) 358 359 def set_content(self, value: bytes | None) -> None: 360 if value is None: 361 self.raw_content = None 362 return 363 if not isinstance(value, bytes): 364 raise TypeError( 365 f"Message content must be bytes, not {type(value).__name__}. " 366 "Please use .text if you want to assign a str." 367 ) 368 ce = self.headers.get("content-encoding") 369 try: 370 self.raw_content = encoding.encode(value, ce or "identity") 371 except ValueError: 372 # So we have an invalid content-encoding? 373 # Let's remove it! 374 del self.headers["content-encoding"] 375 self.raw_content = value 376 377 if "transfer-encoding" in self.headers: 378 # https://httpwg.org/specs/rfc7230.html#header.content-length 379 # don't set content-length if a transfer-encoding is provided 380 pass 381 else: 382 self.headers["content-length"] = str(len(self.raw_content)) 383 384 def get_content(self, strict: bool = True) -> bytes | None: 385 """ 386 Similar to `Message.content`, but does not raise if `strict` is `False`. 387 Instead, the compressed message body is returned as-is. 388 """ 389 if self.raw_content is None: 390 return None 391 ce = self.headers.get("content-encoding") 392 if ce: 393 try: 394 content = encoding.decode(self.raw_content, ce) 395 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 396 if isinstance(content, str): 397 raise ValueError(f"Invalid Content-Encoding: {ce}") 398 return content 399 except ValueError: 400 if strict: 401 raise 402 return self.raw_content 403 else: 404 return self.raw_content 405 406 def set_text(self, text: str | None) -> None: 407 if text is None: 408 self.content = None 409 return 410 enc = infer_content_encoding(self.headers.get("content-type", "")) 411 412 try: 413 self.content = cast(bytes, encoding.encode(text, enc)) 414 except ValueError: 415 # Fall back to UTF-8 and update the content-type header. 416 ct = parse_content_type(self.headers.get("content-type", "")) or ( 417 "text", 418 "plain", 419 {}, 420 ) 421 ct[2]["charset"] = "utf-8" 422 self.headers["content-type"] = assemble_content_type(*ct) 423 enc = "utf8" 424 self.content = text.encode(enc, "surrogateescape") 425 426 def get_text(self, strict: bool = True) -> str | None: 427 """ 428 Similar to `Message.text`, but does not raise if `strict` is `False`. 429 Instead, the message body is returned as surrogate-escaped UTF-8. 430 """ 431 content = self.get_content(strict) 432 if content is None: 433 return None 434 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 435 try: 436 return cast(str, encoding.decode(content, enc)) 437 except ValueError: 438 if strict: 439 raise 440 return content.decode("utf8", "surrogateescape") 441 442 @property 443 def timestamp_start(self) -> float: 444 """ 445 *Timestamp:* Headers received. 446 """ 447 return self.data.timestamp_start 448 449 @timestamp_start.setter 450 def timestamp_start(self, timestamp_start: float) -> None: 451 self.data.timestamp_start = timestamp_start 452 453 @property 454 def timestamp_end(self) -> float | None: 455 """ 456 *Timestamp:* Last byte received. 457 """ 458 return self.data.timestamp_end 459 460 @timestamp_end.setter 461 def timestamp_end(self, timestamp_end: float | None): 462 self.data.timestamp_end = timestamp_end 463 464 def decode(self, strict: bool = True) -> None: 465 """ 466 Decodes body based on the current Content-Encoding header, then 467 removes the header. If there is no Content-Encoding header, no 468 action is taken. 469 470 *Raises:* 471 - `ValueError`, when the content-encoding is invalid and strict is True. 472 """ 473 decoded = self.get_content(strict) 474 self.headers.pop("content-encoding", None) 475 self.content = decoded 476 477 def encode(self, encoding: str) -> None: 478 """ 479 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 480 Any existing content-encodings are overwritten, the content is not decoded beforehand. 481 482 *Raises:* 483 - `ValueError`, when the specified content-encoding is invalid. 484 """ 485 self.headers["content-encoding"] = encoding 486 self.content = self.raw_content 487 if "content-encoding" not in self.headers: 488 raise ValueError(f"Invalid content encoding {repr(encoding)}") 489 490 def json(self, **kwargs: Any) -> Any: 491 """ 492 Returns the JSON encoded content of the response, if any. 493 `**kwargs` are optional arguments that will be 494 passed to `json.loads()`. 495 496 Will raise if the content can not be decoded and then parsed as JSON. 497 498 *Raises:* 499 - `json.decoder.JSONDecodeError` if content is not valid JSON. 500 - `TypeError` if the content is not available, for example because the response 501 has been streamed. 502 """ 503 content = self.get_content(strict=False) 504 if content is None: 505 raise TypeError("Message content is not available.") 506 else: 507 return json.loads(content, **kwargs)
This attribute controls if the message body should be streamed.
If False
, mitmproxy will buffer the entire body before forwarding it to the destination.
This makes it possible to perform string replacements on the entire body.
If True
, the message body will not be buffered on the proxy
but immediately forwarded instead.
Alternatively, a transformation function can be specified, which will be called for each chunk of data.
Please note that packet boundaries generally should not be relied upon.
This attribute must be set in the requestheaders
or responseheaders
hook.
Setting it in request
or response
is already too late, mitmproxy has buffered the message body already.
263 @property 264 def http_version(self) -> str: 265 """ 266 HTTP version string, for example `HTTP/1.1`. 267 """ 268 return self.data.http_version.decode("utf-8", "surrogateescape")
HTTP version string, for example HTTP/1.1
.
292 @property 293 def headers(self) -> Headers: 294 """ 295 The HTTP headers. 296 """ 297 return self.data.headers
The HTTP headers.
303 @property 304 def trailers(self) -> Headers | None: 305 """ 306 The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer). 307 """ 308 return self.data.trailers
The HTTP trailers.
314 @property 315 def raw_content(self) -> bytes | None: 316 """ 317 The raw (potentially compressed) HTTP message body. 318 319 In contrast to `Message.content` and `Message.text`, accessing this property never raises. 320 321 *See also:* `Message.content`, `Message.text` 322 """ 323 return self.data.content
The raw (potentially compressed) HTTP message body.
In contrast to Message.content
and Message.text
, accessing this property never raises.
See also: Message.content
, Message.text
329 @property 330 def content(self) -> bytes | None: 331 """ 332 The uncompressed HTTP message body as bytes. 333 334 Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid. 335 336 *See also:* `Message.raw_content`, `Message.text` 337 """ 338 return self.get_content()
The uncompressed HTTP message body as bytes.
Accessing this attribute may raise a ValueError
when the HTTP content-encoding is invalid.
See also: Message.raw_content
, Message.text
344 @property 345 def text(self) -> str | None: 346 """ 347 The uncompressed and decoded HTTP message body as text. 348 349 Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid. 350 351 *See also:* `Message.raw_content`, `Message.content` 352 """ 353 return self.get_text()
The uncompressed and decoded HTTP message body as text.
Accessing this attribute may raise a ValueError
when either content-encoding or charset is invalid.
See also: Message.raw_content
, Message.content
359 def set_content(self, value: bytes | None) -> None: 360 if value is None: 361 self.raw_content = None 362 return 363 if not isinstance(value, bytes): 364 raise TypeError( 365 f"Message content must be bytes, not {type(value).__name__}. " 366 "Please use .text if you want to assign a str." 367 ) 368 ce = self.headers.get("content-encoding") 369 try: 370 self.raw_content = encoding.encode(value, ce or "identity") 371 except ValueError: 372 # So we have an invalid content-encoding? 373 # Let's remove it! 374 del self.headers["content-encoding"] 375 self.raw_content = value 376 377 if "transfer-encoding" in self.headers: 378 # https://httpwg.org/specs/rfc7230.html#header.content-length 379 # don't set content-length if a transfer-encoding is provided 380 pass 381 else: 382 self.headers["content-length"] = str(len(self.raw_content))
384 def get_content(self, strict: bool = True) -> bytes | None: 385 """ 386 Similar to `Message.content`, but does not raise if `strict` is `False`. 387 Instead, the compressed message body is returned as-is. 388 """ 389 if self.raw_content is None: 390 return None 391 ce = self.headers.get("content-encoding") 392 if ce: 393 try: 394 content = encoding.decode(self.raw_content, ce) 395 # A client may illegally specify a byte -> str encoding here (e.g. utf8) 396 if isinstance(content, str): 397 raise ValueError(f"Invalid Content-Encoding: {ce}") 398 return content 399 except ValueError: 400 if strict: 401 raise 402 return self.raw_content 403 else: 404 return self.raw_content
Similar to Message.content
, but does not raise if strict
is False
.
Instead, the compressed message body is returned as-is.
406 def set_text(self, text: str | None) -> None: 407 if text is None: 408 self.content = None 409 return 410 enc = infer_content_encoding(self.headers.get("content-type", "")) 411 412 try: 413 self.content = cast(bytes, encoding.encode(text, enc)) 414 except ValueError: 415 # Fall back to UTF-8 and update the content-type header. 416 ct = parse_content_type(self.headers.get("content-type", "")) or ( 417 "text", 418 "plain", 419 {}, 420 ) 421 ct[2]["charset"] = "utf-8" 422 self.headers["content-type"] = assemble_content_type(*ct) 423 enc = "utf8" 424 self.content = text.encode(enc, "surrogateescape")
426 def get_text(self, strict: bool = True) -> str | None: 427 """ 428 Similar to `Message.text`, but does not raise if `strict` is `False`. 429 Instead, the message body is returned as surrogate-escaped UTF-8. 430 """ 431 content = self.get_content(strict) 432 if content is None: 433 return None 434 enc = infer_content_encoding(self.headers.get("content-type", ""), content) 435 try: 436 return cast(str, encoding.decode(content, enc)) 437 except ValueError: 438 if strict: 439 raise 440 return content.decode("utf8", "surrogateescape")
Similar to Message.text
, but does not raise if strict
is False
.
Instead, the message body is returned as surrogate-escaped UTF-8.
442 @property 443 def timestamp_start(self) -> float: 444 """ 445 *Timestamp:* Headers received. 446 """ 447 return self.data.timestamp_start
Timestamp: Headers received.
453 @property 454 def timestamp_end(self) -> float | None: 455 """ 456 *Timestamp:* Last byte received. 457 """ 458 return self.data.timestamp_end
Timestamp: Last byte received.
464 def decode(self, strict: bool = True) -> None: 465 """ 466 Decodes body based on the current Content-Encoding header, then 467 removes the header. If there is no Content-Encoding header, no 468 action is taken. 469 470 *Raises:* 471 - `ValueError`, when the content-encoding is invalid and strict is True. 472 """ 473 decoded = self.get_content(strict) 474 self.headers.pop("content-encoding", None) 475 self.content = decoded
Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.
Raises:
ValueError
, when the content-encoding is invalid and strict is True.
477 def encode(self, encoding: str) -> None: 478 """ 479 Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". 480 Any existing content-encodings are overwritten, the content is not decoded beforehand. 481 482 *Raises:* 483 - `ValueError`, when the specified content-encoding is invalid. 484 """ 485 self.headers["content-encoding"] = encoding 486 self.content = self.raw_content 487 if "content-encoding" not in self.headers: 488 raise ValueError(f"Invalid content encoding {repr(encoding)}")
Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.
Raises:
ValueError
, when the specified content-encoding is invalid.
490 def json(self, **kwargs: Any) -> Any: 491 """ 492 Returns the JSON encoded content of the response, if any. 493 `**kwargs` are optional arguments that will be 494 passed to `json.loads()`. 495 496 Will raise if the content can not be decoded and then parsed as JSON. 497 498 *Raises:* 499 - `json.decoder.JSONDecodeError` if content is not valid JSON. 500 - `TypeError` if the content is not available, for example because the response 501 has been streamed. 502 """ 503 content = self.get_content(strict=False) 504 if content is None: 505 raise TypeError("Message content is not available.") 506 else: 507 return json.loads(content, **kwargs)
Returns the JSON encoded content of the response, if any.
**kwargs
are optional arguments that will be
passed to json.loads()
.
Will raise if the content can not be decoded and then parsed as JSON.
Raises:
json.decoder.JSONDecodeError
if content is not valid JSON.TypeError
if the content is not available, for example because the response has been streamed.
510class Request(Message): 511 """ 512 An HTTP request. 513 """ 514 515 data: RequestData 516 517 def __init__( 518 self, 519 host: str, 520 port: int, 521 method: bytes, 522 scheme: bytes, 523 authority: bytes, 524 path: bytes, 525 http_version: bytes, 526 headers: Headers | tuple[tuple[bytes, bytes], ...], 527 content: bytes | None, 528 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 529 timestamp_start: float, 530 timestamp_end: float | None, 531 ): 532 # auto-convert invalid types to retain compatibility with older code. 533 if isinstance(host, bytes): 534 host = host.decode("idna", "strict") 535 if isinstance(method, str): 536 method = method.encode("ascii", "strict") 537 if isinstance(scheme, str): 538 scheme = scheme.encode("ascii", "strict") 539 if isinstance(authority, str): 540 authority = authority.encode("ascii", "strict") 541 if isinstance(path, str): 542 path = path.encode("ascii", "strict") 543 if isinstance(http_version, str): 544 http_version = http_version.encode("ascii", "strict") 545 546 if isinstance(content, str): 547 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 548 if not isinstance(headers, Headers): 549 headers = Headers(headers) 550 if trailers is not None and not isinstance(trailers, Headers): 551 trailers = Headers(trailers) 552 553 self.data = RequestData( 554 host=host, 555 port=port, 556 method=method, 557 scheme=scheme, 558 authority=authority, 559 path=path, 560 http_version=http_version, 561 headers=headers, 562 content=content, 563 trailers=trailers, 564 timestamp_start=timestamp_start, 565 timestamp_end=timestamp_end, 566 ) 567 568 def __repr__(self) -> str: 569 if self.host and self.port: 570 hostport = f"{self.host}:{self.port}" 571 else: 572 hostport = "" 573 path = self.path or "" 574 return f"Request({self.method} {hostport}{path})" 575 576 @classmethod 577 def make( 578 cls, 579 method: str, 580 url: str, 581 content: bytes | str = "", 582 headers: ( 583 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 584 ) = (), 585 ) -> "Request": 586 """ 587 Simplified API for creating request objects. 588 """ 589 # Headers can be list or dict, we differentiate here. 590 if isinstance(headers, Headers): 591 pass 592 elif isinstance(headers, dict): 593 headers = Headers( 594 ( 595 always_bytes(k, "utf-8", "surrogateescape"), 596 always_bytes(v, "utf-8", "surrogateescape"), 597 ) 598 for k, v in headers.items() 599 ) 600 elif isinstance(headers, Iterable): 601 headers = Headers(headers) # type: ignore 602 else: 603 raise TypeError( 604 "Expected headers to be an iterable or dict, but is {}.".format( 605 type(headers).__name__ 606 ) 607 ) 608 609 req = cls( 610 "", 611 0, 612 method.encode("utf-8", "surrogateescape"), 613 b"", 614 b"", 615 b"", 616 b"HTTP/1.1", 617 headers, 618 b"", 619 None, 620 time.time(), 621 time.time(), 622 ) 623 624 req.url = url 625 # Assign this manually to update the content-length header. 626 if isinstance(content, bytes): 627 req.content = content 628 elif isinstance(content, str): 629 req.text = content 630 else: 631 raise TypeError( 632 f"Expected content to be str or bytes, but is {type(content).__name__}." 633 ) 634 635 return req 636 637 @property 638 def first_line_format(self) -> str: 639 """ 640 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 641 642 origin-form and asterisk-form are subsumed as "relative". 643 """ 644 if self.method == "CONNECT": 645 return "authority" 646 elif self.authority: 647 return "absolute" 648 else: 649 return "relative" 650 651 @property 652 def method(self) -> str: 653 """ 654 HTTP request method, e.g. "GET". 655 """ 656 return self.data.method.decode("utf-8", "surrogateescape").upper() 657 658 @method.setter 659 def method(self, val: str | bytes) -> None: 660 self.data.method = always_bytes(val, "utf-8", "surrogateescape") 661 662 @property 663 def scheme(self) -> str: 664 """ 665 HTTP request scheme, which should be "http" or "https". 666 """ 667 return self.data.scheme.decode("utf-8", "surrogateescape") 668 669 @scheme.setter 670 def scheme(self, val: str | bytes) -> None: 671 self.data.scheme = always_bytes(val, "utf-8", "surrogateescape") 672 673 @property 674 def authority(self) -> str: 675 """ 676 HTTP request authority. 677 678 For HTTP/1, this is the authority portion of the request target 679 (in either absolute-form or authority-form). 680 For origin-form and asterisk-form requests, this property is set to an empty string. 681 682 For HTTP/2, this is the :authority pseudo header. 683 684 *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host` 685 """ 686 try: 687 return self.data.authority.decode("idna") 688 except UnicodeError: 689 return self.data.authority.decode("utf8", "surrogateescape") 690 691 @authority.setter 692 def authority(self, val: str | bytes) -> None: 693 if isinstance(val, str): 694 try: 695 val = val.encode("idna", "strict") 696 except UnicodeError: 697 val = val.encode("utf8", "surrogateescape") # type: ignore 698 self.data.authority = val 699 700 @property 701 def host(self) -> str: 702 """ 703 Target server for this request. This may be parsed from the raw request 704 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 705 or inferred from the proxy mode (e.g. an IP in transparent mode). 706 707 Setting the host attribute also updates the host header and authority information, if present. 708 709 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 710 """ 711 return self.data.host 712 713 @host.setter 714 def host(self, val: str | bytes) -> None: 715 self.data.host = always_str(val, "idna", "strict") 716 self._update_host_and_authority() 717 718 @property 719 def host_header(self) -> str | None: 720 """ 721 The request's host/authority header. 722 723 This property maps to either ``request.headers["Host"]`` or 724 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 725 726 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 727 """ 728 if self.is_http2 or self.is_http3: 729 return self.authority or self.data.headers.get("Host", None) 730 else: 731 return self.data.headers.get("Host", None) 732 733 @host_header.setter 734 def host_header(self, val: None | str | bytes) -> None: 735 if val is None: 736 if self.is_http2 or self.is_http3: 737 self.data.authority = b"" 738 self.headers.pop("Host", None) 739 else: 740 if self.is_http2 or self.is_http3: 741 self.authority = val # type: ignore 742 if not (self.is_http2 or self.is_http3) or "Host" in self.headers: 743 # For h2, we only overwrite, but not create, as :authority is the h2 host header. 744 self.headers["Host"] = val 745 746 @property 747 def port(self) -> int: 748 """ 749 Target port. 750 """ 751 return self.data.port 752 753 @port.setter 754 def port(self, port: int) -> None: 755 if not isinstance(port, int): 756 raise ValueError(f"Port must be an integer, not {port!r}.") 757 758 self.data.port = port 759 self._update_host_and_authority() 760 761 def _update_host_and_authority(self) -> None: 762 val = url.hostport(self.scheme, self.host, self.port) 763 764 # Update host header 765 if "Host" in self.data.headers: 766 self.data.headers["Host"] = val 767 # Update authority 768 if self.data.authority: 769 self.authority = val 770 771 @property 772 def path(self) -> str: 773 """ 774 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 775 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 776 777 This attribute includes both path and query parts of the target URI 778 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 779 """ 780 return self.data.path.decode("utf-8", "surrogateescape") 781 782 @path.setter 783 def path(self, val: str | bytes) -> None: 784 self.data.path = always_bytes(val, "utf-8", "surrogateescape") 785 786 @property 787 def url(self) -> str: 788 """ 789 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 790 791 Settings this property updates these attributes as well. 792 """ 793 if self.first_line_format == "authority": 794 return f"{self.host}:{self.port}" 795 path = self.path if self.path != "*" else "" 796 return url.unparse(self.scheme, self.host, self.port, path) 797 798 @url.setter 799 def url(self, val: str | bytes) -> None: 800 val = always_str(val, "utf-8", "surrogateescape") 801 self.scheme, self.host, self.port, self.path = url.parse(val) # type: ignore 802 803 @property 804 def pretty_host(self) -> str: 805 """ 806 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 807 This is useful in transparent mode where `Request.host` is only an IP address. 808 809 *Warning:* When working in adversarial environments, this may not reflect the actual destination 810 as the Host header could be spoofed. 811 """ 812 authority = self.host_header 813 if authority: 814 return url.parse_authority(authority, check=False)[0] 815 else: 816 return self.host 817 818 @property 819 def pretty_url(self) -> str: 820 """ 821 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 822 """ 823 if self.first_line_format == "authority": 824 return self.authority 825 826 host_header = self.host_header 827 if not host_header: 828 return self.url 829 830 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 831 pretty_port = pretty_port or url.default_port(self.scheme) or 443 832 path = self.path if self.path != "*" else "" 833 834 return url.unparse(self.scheme, pretty_host, pretty_port, path) 835 836 def _get_query(self): 837 query = urllib.parse.urlparse(self.url).query 838 return tuple(url.decode(query)) 839 840 def _set_query(self, query_data): 841 query = url.encode(query_data) 842 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 843 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 844 845 @property 846 def query(self) -> multidict.MultiDictView[str, str]: 847 """ 848 The request query as a mutable mapping view on the request's path. 849 For the most part, this behaves like a dictionary. 850 Modifications to the MultiDictView update `Request.path`, and vice versa. 851 """ 852 return multidict.MultiDictView(self._get_query, self._set_query) 853 854 @query.setter 855 def query(self, value): 856 self._set_query(value) 857 858 def _get_cookies(self): 859 h = self.headers.get_all("Cookie") 860 return tuple(cookies.parse_cookie_headers(h)) 861 862 def _set_cookies(self, value): 863 self.headers["cookie"] = cookies.format_cookie_header(value) 864 865 @property 866 def cookies(self) -> multidict.MultiDictView[str, str]: 867 """ 868 The request cookies. 869 For the most part, this behaves like a dictionary. 870 Modifications to the MultiDictView update `Request.headers`, and vice versa. 871 """ 872 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 873 874 @cookies.setter 875 def cookies(self, value): 876 self._set_cookies(value) 877 878 @property 879 def path_components(self) -> tuple[str, ...]: 880 """ 881 The URL's path components as a tuple of strings. 882 Components are unquoted. 883 """ 884 path = urllib.parse.urlparse(self.url).path 885 # This needs to be a tuple so that it's immutable. 886 # Otherwise, this would fail silently: 887 # request.path_components.append("foo") 888 return tuple(url.unquote(i) for i in path.split("/") if i) 889 890 @path_components.setter 891 def path_components(self, components: Iterable[str]): 892 components = map(lambda x: url.quote(x, safe=""), components) 893 path = "/" + "/".join(components) 894 _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) 895 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 896 897 def anticache(self) -> None: 898 """ 899 Modifies this request to remove headers that might produce a cached response. 900 """ 901 delheaders = ( 902 "if-modified-since", 903 "if-none-match", 904 ) 905 for i in delheaders: 906 self.headers.pop(i, None) 907 908 def anticomp(self) -> None: 909 """ 910 Modify the Accept-Encoding header to only accept uncompressed responses. 911 """ 912 self.headers["accept-encoding"] = "identity" 913 914 def constrain_encoding(self) -> None: 915 """ 916 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 917 """ 918 accept_encoding = self.headers.get("accept-encoding") 919 if accept_encoding: 920 self.headers["accept-encoding"] = ", ".join( 921 e 922 for e in {"gzip", "identity", "deflate", "br", "zstd"} 923 if e in accept_encoding 924 ) 925 926 def _get_urlencoded_form(self): 927 is_valid_content_type = ( 928 "application/x-www-form-urlencoded" 929 in self.headers.get("content-type", "").lower() 930 ) 931 if is_valid_content_type: 932 return tuple(url.decode(self.get_text(strict=False))) 933 return () 934 935 def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None: 936 """ 937 Sets the body to the URL-encoded form data, and adds the appropriate content-type header. 938 This will overwrite the existing content if there is one. 939 """ 940 self.headers["content-type"] = "application/x-www-form-urlencoded" 941 self.content = url.encode(form_data, self.get_text(strict=False)).encode() 942 943 @property 944 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 945 """ 946 The URL-encoded form data. 947 948 If the content-type indicates non-form data or the form could not be parsed, this is set to 949 an empty `MultiDictView`. 950 951 Modifications to the MultiDictView update `Request.content`, and vice versa. 952 """ 953 return multidict.MultiDictView( 954 self._get_urlencoded_form, self._set_urlencoded_form 955 ) 956 957 @urlencoded_form.setter 958 def urlencoded_form(self, value): 959 self._set_urlencoded_form(value) 960 961 def _get_multipart_form(self) -> list[tuple[bytes, bytes]]: 962 is_valid_content_type = ( 963 "multipart/form-data" in self.headers.get("content-type", "").lower() 964 ) 965 if is_valid_content_type and self.content is not None: 966 try: 967 return multipart.decode_multipart( 968 self.headers.get("content-type"), self.content 969 ) 970 except ValueError: 971 pass 972 return [] 973 974 def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 975 ct = self.headers.get("content-type", "") 976 is_valid_content_type = ct.lower().startswith("multipart/form-data") 977 if not is_valid_content_type: 978 """ 979 Generate a random boundary here. 980 981 See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications 982 on generating the boundary. 983 """ 984 boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode() 985 self.headers["content-type"] = ct = ( 986 f"multipart/form-data; boundary={boundary}" 987 ) 988 self.content = multipart.encode_multipart(ct, value) 989 990 @property 991 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 992 """ 993 The multipart form data. 994 995 If the content-type indicates non-form data or the form could not be parsed, this is set to 996 an empty `MultiDictView`. 997 998 Modifications to the MultiDictView update `Request.content`, and vice versa. 999 """ 1000 return multidict.MultiDictView( 1001 self._get_multipart_form, self._set_multipart_form 1002 ) 1003 1004 @multipart_form.setter 1005 def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None: 1006 self._set_multipart_form(value)
An HTTP request.
517 def __init__( 518 self, 519 host: str, 520 port: int, 521 method: bytes, 522 scheme: bytes, 523 authority: bytes, 524 path: bytes, 525 http_version: bytes, 526 headers: Headers | tuple[tuple[bytes, bytes], ...], 527 content: bytes | None, 528 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 529 timestamp_start: float, 530 timestamp_end: float | None, 531 ): 532 # auto-convert invalid types to retain compatibility with older code. 533 if isinstance(host, bytes): 534 host = host.decode("idna", "strict") 535 if isinstance(method, str): 536 method = method.encode("ascii", "strict") 537 if isinstance(scheme, str): 538 scheme = scheme.encode("ascii", "strict") 539 if isinstance(authority, str): 540 authority = authority.encode("ascii", "strict") 541 if isinstance(path, str): 542 path = path.encode("ascii", "strict") 543 if isinstance(http_version, str): 544 http_version = http_version.encode("ascii", "strict") 545 546 if isinstance(content, str): 547 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 548 if not isinstance(headers, Headers): 549 headers = Headers(headers) 550 if trailers is not None and not isinstance(trailers, Headers): 551 trailers = Headers(trailers) 552 553 self.data = RequestData( 554 host=host, 555 port=port, 556 method=method, 557 scheme=scheme, 558 authority=authority, 559 path=path, 560 http_version=http_version, 561 headers=headers, 562 content=content, 563 trailers=trailers, 564 timestamp_start=timestamp_start, 565 timestamp_end=timestamp_end, 566 )
576 @classmethod 577 def make( 578 cls, 579 method: str, 580 url: str, 581 content: bytes | str = "", 582 headers: ( 583 Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] 584 ) = (), 585 ) -> "Request": 586 """ 587 Simplified API for creating request objects. 588 """ 589 # Headers can be list or dict, we differentiate here. 590 if isinstance(headers, Headers): 591 pass 592 elif isinstance(headers, dict): 593 headers = Headers( 594 ( 595 always_bytes(k, "utf-8", "surrogateescape"), 596 always_bytes(v, "utf-8", "surrogateescape"), 597 ) 598 for k, v in headers.items() 599 ) 600 elif isinstance(headers, Iterable): 601 headers = Headers(headers) # type: ignore 602 else: 603 raise TypeError( 604 "Expected headers to be an iterable or dict, but is {}.".format( 605 type(headers).__name__ 606 ) 607 ) 608 609 req = cls( 610 "", 611 0, 612 method.encode("utf-8", "surrogateescape"), 613 b"", 614 b"", 615 b"", 616 b"HTTP/1.1", 617 headers, 618 b"", 619 None, 620 time.time(), 621 time.time(), 622 ) 623 624 req.url = url 625 # Assign this manually to update the content-length header. 626 if isinstance(content, bytes): 627 req.content = content 628 elif isinstance(content, str): 629 req.text = content 630 else: 631 raise TypeError( 632 f"Expected content to be str or bytes, but is {type(content).__name__}." 633 ) 634 635 return req
Simplified API for creating request objects.
637 @property 638 def first_line_format(self) -> str: 639 """ 640 *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3). 641 642 origin-form and asterisk-form are subsumed as "relative". 643 """ 644 if self.method == "CONNECT": 645 return "authority" 646 elif self.authority: 647 return "absolute" 648 else: 649 return "relative"
Read-only: HTTP request form as defined in RFC 7230.
origin-form and asterisk-form are subsumed as "relative".
651 @property 652 def method(self) -> str: 653 """ 654 HTTP request method, e.g. "GET". 655 """ 656 return self.data.method.decode("utf-8", "surrogateescape").upper()
HTTP request method, e.g. "GET".
662 @property 663 def scheme(self) -> str: 664 """ 665 HTTP request scheme, which should be "http" or "https". 666 """ 667 return self.data.scheme.decode("utf-8", "surrogateescape")
HTTP request scheme, which should be "http" or "https".
700 @property 701 def host(self) -> str: 702 """ 703 Target server for this request. This may be parsed from the raw request 704 (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) 705 or inferred from the proxy mode (e.g. an IP in transparent mode). 706 707 Setting the host attribute also updates the host header and authority information, if present. 708 709 *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host` 710 """ 711 return self.data.host
Target server for this request. This may be parsed from the raw request
(e.g. from a GET http://example.com/ HTTP/1.1
request line)
or inferred from the proxy mode (e.g. an IP in transparent mode).
Setting the host attribute also updates the host header and authority information, if present.
See also: Request.authority
, Request.host_header
, Request.pretty_host
718 @property 719 def host_header(self) -> str | None: 720 """ 721 The request's host/authority header. 722 723 This property maps to either ``request.headers["Host"]`` or 724 ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0. 725 726 *See also:* `Request.authority`,`Request.host`, `Request.pretty_host` 727 """ 728 if self.is_http2 or self.is_http3: 729 return self.authority or self.data.headers.get("Host", None) 730 else: 731 return self.data.headers.get("Host", None)
The request's host/authority header.
This property maps to either request.headers["Host"]
or
request.authority
, depending on whether it's HTTP/1.x or HTTP/2.0.
See also: Request.authority
,Request.host
, Request.pretty_host
771 @property 772 def path(self) -> str: 773 """ 774 HTTP request path, e.g. "/index.html" or "/index.html?a=b". 775 Usually starts with a slash, except for OPTIONS requests, which may just be "*". 776 777 This attribute includes both path and query parts of the target URI 778 (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)). 779 """ 780 return self.data.path.decode("utf-8", "surrogateescape")
HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".
This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).
786 @property 787 def url(self) -> str: 788 """ 789 The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`. 790 791 Settings this property updates these attributes as well. 792 """ 793 if self.first_line_format == "authority": 794 return f"{self.host}:{self.port}" 795 path = self.path if self.path != "*" else "" 796 return url.unparse(self.scheme, self.host, self.port, path)
The full URL string, constructed from Request.scheme
, Request.host
, Request.port
and Request.path
.
Settings this property updates these attributes as well.
803 @property 804 def pretty_host(self) -> str: 805 """ 806 *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source. 807 This is useful in transparent mode where `Request.host` is only an IP address. 808 809 *Warning:* When working in adversarial environments, this may not reflect the actual destination 810 as the Host header could be spoofed. 811 """ 812 authority = self.host_header 813 if authority: 814 return url.parse_authority(authority, check=False)[0] 815 else: 816 return self.host
Read-only: Like Request.host
, but using Request.host_header
header as an additional (preferred) data source.
This is useful in transparent mode where Request.host
is only an IP address.
Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.
818 @property 819 def pretty_url(self) -> str: 820 """ 821 *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`. 822 """ 823 if self.first_line_format == "authority": 824 return self.authority 825 826 host_header = self.host_header 827 if not host_header: 828 return self.url 829 830 pretty_host, pretty_port = url.parse_authority(host_header, check=False) 831 pretty_port = pretty_port or url.default_port(self.scheme) or 443 832 path = self.path if self.path != "*" else "" 833 834 return url.unparse(self.scheme, pretty_host, pretty_port, path)
Read-only: Like Request.url
, but using Request.pretty_host
instead of Request.host
.
845 @property 846 def query(self) -> multidict.MultiDictView[str, str]: 847 """ 848 The request query as a mutable mapping view on the request's path. 849 For the most part, this behaves like a dictionary. 850 Modifications to the MultiDictView update `Request.path`, and vice versa. 851 """ 852 return multidict.MultiDictView(self._get_query, self._set_query)
The request query as a mutable mapping view on the request's path.
For the most part, this behaves like a dictionary.
Modifications to the MultiDictView update Request.path
, and vice versa.
878 @property 879 def path_components(self) -> tuple[str, ...]: 880 """ 881 The URL's path components as a tuple of strings. 882 Components are unquoted. 883 """ 884 path = urllib.parse.urlparse(self.url).path 885 # This needs to be a tuple so that it's immutable. 886 # Otherwise, this would fail silently: 887 # request.path_components.append("foo") 888 return tuple(url.unquote(i) for i in path.split("/") if i)
The URL's path components as a tuple of strings. Components are unquoted.
897 def anticache(self) -> None: 898 """ 899 Modifies this request to remove headers that might produce a cached response. 900 """ 901 delheaders = ( 902 "if-modified-since", 903 "if-none-match", 904 ) 905 for i in delheaders: 906 self.headers.pop(i, None)
Modifies this request to remove headers that might produce a cached response.
908 def anticomp(self) -> None: 909 """ 910 Modify the Accept-Encoding header to only accept uncompressed responses. 911 """ 912 self.headers["accept-encoding"] = "identity"
Modify the Accept-Encoding header to only accept uncompressed responses.
914 def constrain_encoding(self) -> None: 915 """ 916 Limits the permissible Accept-Encoding values, based on what we can decode appropriately. 917 """ 918 accept_encoding = self.headers.get("accept-encoding") 919 if accept_encoding: 920 self.headers["accept-encoding"] = ", ".join( 921 e 922 for e in {"gzip", "identity", "deflate", "br", "zstd"} 923 if e in accept_encoding 924 )
Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
943 @property 944 def urlencoded_form(self) -> multidict.MultiDictView[str, str]: 945 """ 946 The URL-encoded form data. 947 948 If the content-type indicates non-form data or the form could not be parsed, this is set to 949 an empty `MultiDictView`. 950 951 Modifications to the MultiDictView update `Request.content`, and vice versa. 952 """ 953 return multidict.MultiDictView( 954 self._get_urlencoded_form, self._set_urlencoded_form 955 )
The URL-encoded form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
990 @property 991 def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]: 992 """ 993 The multipart form data. 994 995 If the content-type indicates non-form data or the form could not be parsed, this is set to 996 an empty `MultiDictView`. 997 998 Modifications to the MultiDictView update `Request.content`, and vice versa. 999 """ 1000 return multidict.MultiDictView( 1001 self._get_multipart_form, self._set_multipart_form 1002 )
The multipart form data.
If the content-type indicates non-form data or the form could not be parsed, this is set to
an empty MultiDictView
.
Modifications to the MultiDictView update Request.content
, and vice versa.
1009class Response(Message): 1010 """ 1011 An HTTP response. 1012 """ 1013 1014 data: ResponseData 1015 1016 def __init__( 1017 self, 1018 http_version: bytes, 1019 status_code: int, 1020 reason: bytes, 1021 headers: Headers | tuple[tuple[bytes, bytes], ...], 1022 content: bytes | None, 1023 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1024 timestamp_start: float, 1025 timestamp_end: float | None, 1026 ): 1027 # auto-convert invalid types to retain compatibility with older code. 1028 if isinstance(http_version, str): 1029 http_version = http_version.encode("ascii", "strict") 1030 if isinstance(reason, str): 1031 reason = reason.encode("ascii", "strict") 1032 1033 if isinstance(content, str): 1034 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1035 if not isinstance(headers, Headers): 1036 headers = Headers(headers) 1037 if trailers is not None and not isinstance(trailers, Headers): 1038 trailers = Headers(trailers) 1039 1040 self.data = ResponseData( 1041 http_version=http_version, 1042 status_code=status_code, 1043 reason=reason, 1044 headers=headers, 1045 content=content, 1046 trailers=trailers, 1047 timestamp_start=timestamp_start, 1048 timestamp_end=timestamp_end, 1049 ) 1050 1051 def __repr__(self) -> str: 1052 if self.raw_content: 1053 ct = self.headers.get("content-type", "unknown content type") 1054 size = human.pretty_size(len(self.raw_content)) 1055 details = f"{ct}, {size}" 1056 else: 1057 details = "no content" 1058 return f"Response({self.status_code}, {details})" 1059 1060 @classmethod 1061 def make( 1062 cls, 1063 status_code: int = 200, 1064 content: bytes | str = b"", 1065 headers: ( 1066 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1067 ) = (), 1068 ) -> "Response": 1069 """ 1070 Simplified API for creating response objects. 1071 """ 1072 if isinstance(headers, Headers): 1073 headers = headers 1074 elif isinstance(headers, dict): 1075 headers = Headers( 1076 ( 1077 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1078 always_bytes(v, "utf-8", "surrogateescape"), 1079 ) 1080 for k, v in headers.items() 1081 ) 1082 elif isinstance(headers, Iterable): 1083 headers = Headers(headers) # type: ignore 1084 else: 1085 raise TypeError( 1086 "Expected headers to be an iterable or dict, but is {}.".format( 1087 type(headers).__name__ 1088 ) 1089 ) 1090 1091 resp = cls( 1092 b"HTTP/1.1", 1093 status_code, 1094 status_codes.RESPONSES.get(status_code, "").encode(), 1095 headers, 1096 None, 1097 None, 1098 time.time(), 1099 time.time(), 1100 ) 1101 1102 # Assign this manually to update the content-length header. 1103 if isinstance(content, bytes): 1104 resp.content = content 1105 elif isinstance(content, str): 1106 resp.text = content 1107 else: 1108 raise TypeError( 1109 f"Expected content to be str or bytes, but is {type(content).__name__}." 1110 ) 1111 1112 return resp 1113 1114 @property 1115 def status_code(self) -> int: 1116 """ 1117 HTTP Status Code, e.g. ``200``. 1118 """ 1119 return self.data.status_code 1120 1121 @status_code.setter 1122 def status_code(self, status_code: int) -> None: 1123 self.data.status_code = status_code 1124 1125 @property 1126 def reason(self) -> str: 1127 """ 1128 HTTP reason phrase, for example "Not Found". 1129 1130 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1131 """ 1132 # Encoding: http://stackoverflow.com/a/16674906/934719 1133 return self.data.reason.decode("ISO-8859-1") 1134 1135 @reason.setter 1136 def reason(self, reason: str | bytes) -> None: 1137 self.data.reason = strutils.always_bytes(reason, "ISO-8859-1") 1138 1139 def _get_cookies(self): 1140 h = self.headers.get_all("set-cookie") 1141 all_cookies = cookies.parse_set_cookie_headers(h) 1142 return tuple((name, (value, attrs)) for name, value, attrs in all_cookies) 1143 1144 def _set_cookies(self, value): 1145 cookie_headers = [] 1146 for k, v in value: 1147 header = cookies.format_set_cookie_header([(k, v[0], v[1])]) 1148 cookie_headers.append(header) 1149 self.headers.set_all("set-cookie", cookie_headers) 1150 1151 @property 1152 def cookies( 1153 self, 1154 ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]: 1155 """ 1156 The response cookies. A possibly empty `MultiDictView`, where the keys are cookie 1157 name strings, and values are `(cookie value, attributes)` tuples. Within 1158 attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value. 1159 Modifications to the MultiDictView update `Response.headers`, and vice versa. 1160 1161 *Warning:* Changes to `attributes` will not be picked up unless you also reassign 1162 the `(cookie value, attributes)` tuple directly in the `MultiDictView`. 1163 """ 1164 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 1165 1166 @cookies.setter 1167 def cookies(self, value): 1168 self._set_cookies(value) 1169 1170 def refresh(self, now=None): 1171 """ 1172 This fairly complex and heuristic function refreshes a server 1173 response for replay. 1174 1175 - It adjusts date, expires, and last-modified headers. 1176 - It adjusts cookie expiration. 1177 """ 1178 if not now: 1179 now = time.time() 1180 delta = now - self.timestamp_start 1181 refresh_headers = [ 1182 "date", 1183 "expires", 1184 "last-modified", 1185 ] 1186 for i in refresh_headers: 1187 if i in self.headers: 1188 d = parsedate_tz(self.headers[i]) 1189 if d: 1190 new = mktime_tz(d) + delta 1191 try: 1192 self.headers[i] = formatdate(new, usegmt=True) 1193 except OSError: # pragma: no cover 1194 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1195 c = [] 1196 for set_cookie_header in self.headers.get_all("set-cookie"): 1197 try: 1198 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1199 except ValueError: 1200 refreshed = set_cookie_header 1201 c.append(refreshed) 1202 if c: 1203 self.headers.set_all("set-cookie", c)
An HTTP response.
1016 def __init__( 1017 self, 1018 http_version: bytes, 1019 status_code: int, 1020 reason: bytes, 1021 headers: Headers | tuple[tuple[bytes, bytes], ...], 1022 content: bytes | None, 1023 trailers: None | Headers | tuple[tuple[bytes, bytes], ...], 1024 timestamp_start: float, 1025 timestamp_end: float | None, 1026 ): 1027 # auto-convert invalid types to retain compatibility with older code. 1028 if isinstance(http_version, str): 1029 http_version = http_version.encode("ascii", "strict") 1030 if isinstance(reason, str): 1031 reason = reason.encode("ascii", "strict") 1032 1033 if isinstance(content, str): 1034 raise ValueError(f"Content must be bytes, not {type(content).__name__}") 1035 if not isinstance(headers, Headers): 1036 headers = Headers(headers) 1037 if trailers is not None and not isinstance(trailers, Headers): 1038 trailers = Headers(trailers) 1039 1040 self.data = ResponseData( 1041 http_version=http_version, 1042 status_code=status_code, 1043 reason=reason, 1044 headers=headers, 1045 content=content, 1046 trailers=trailers, 1047 timestamp_start=timestamp_start, 1048 timestamp_end=timestamp_end, 1049 )
1060 @classmethod 1061 def make( 1062 cls, 1063 status_code: int = 200, 1064 content: bytes | str = b"", 1065 headers: ( 1066 Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] 1067 ) = (), 1068 ) -> "Response": 1069 """ 1070 Simplified API for creating response objects. 1071 """ 1072 if isinstance(headers, Headers): 1073 headers = headers 1074 elif isinstance(headers, dict): 1075 headers = Headers( 1076 ( 1077 always_bytes(k, "utf-8", "surrogateescape"), # type: ignore 1078 always_bytes(v, "utf-8", "surrogateescape"), 1079 ) 1080 for k, v in headers.items() 1081 ) 1082 elif isinstance(headers, Iterable): 1083 headers = Headers(headers) # type: ignore 1084 else: 1085 raise TypeError( 1086 "Expected headers to be an iterable or dict, but is {}.".format( 1087 type(headers).__name__ 1088 ) 1089 ) 1090 1091 resp = cls( 1092 b"HTTP/1.1", 1093 status_code, 1094 status_codes.RESPONSES.get(status_code, "").encode(), 1095 headers, 1096 None, 1097 None, 1098 time.time(), 1099 time.time(), 1100 ) 1101 1102 # Assign this manually to update the content-length header. 1103 if isinstance(content, bytes): 1104 resp.content = content 1105 elif isinstance(content, str): 1106 resp.text = content 1107 else: 1108 raise TypeError( 1109 f"Expected content to be str or bytes, but is {type(content).__name__}." 1110 ) 1111 1112 return resp
Simplified API for creating response objects.
1114 @property 1115 def status_code(self) -> int: 1116 """ 1117 HTTP Status Code, e.g. ``200``. 1118 """ 1119 return self.data.status_code
HTTP Status Code, e.g. 200
.
1125 @property 1126 def reason(self) -> str: 1127 """ 1128 HTTP reason phrase, for example "Not Found". 1129 1130 HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead. 1131 """ 1132 # Encoding: http://stackoverflow.com/a/16674906/934719 1133 return self.data.reason.decode("ISO-8859-1")
HTTP reason phrase, for example "Not Found".
HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1170 def refresh(self, now=None): 1171 """ 1172 This fairly complex and heuristic function refreshes a server 1173 response for replay. 1174 1175 - It adjusts date, expires, and last-modified headers. 1176 - It adjusts cookie expiration. 1177 """ 1178 if not now: 1179 now = time.time() 1180 delta = now - self.timestamp_start 1181 refresh_headers = [ 1182 "date", 1183 "expires", 1184 "last-modified", 1185 ] 1186 for i in refresh_headers: 1187 if i in self.headers: 1188 d = parsedate_tz(self.headers[i]) 1189 if d: 1190 new = mktime_tz(d) + delta 1191 try: 1192 self.headers[i] = formatdate(new, usegmt=True) 1193 except OSError: # pragma: no cover 1194 pass # value out of bounds on Windows only (which is why we exclude it from coverage). 1195 c = [] 1196 for set_cookie_header in self.headers.get_all("set-cookie"): 1197 try: 1198 refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) 1199 except ValueError: 1200 refreshed = set_cookie_header 1201 c.append(refreshed) 1202 if c: 1203 self.headers.set_all("set-cookie", c)
This fairly complex and heuristic function refreshes a server response for replay.
- It adjusts date, expires, and last-modified headers.
- It adjusts cookie expiration.
50class Headers(multidict.MultiDict): # type: ignore 51 """ 52 Header class which allows both convenient access to individual headers as well as 53 direct access to the underlying raw data. Provides a full dictionary interface. 54 55 Create headers with keyword arguments: 56 >>> h = Headers(host="example.com", content_type="application/xml") 57 58 Headers mostly behave like a normal dict: 59 >>> h["Host"] 60 "example.com" 61 62 Headers are case insensitive: 63 >>> h["host"] 64 "example.com" 65 66 Headers can also be created from a list of raw (header_name, header_value) byte tuples: 67 >>> h = Headers([ 68 (b"Host",b"example.com"), 69 (b"Accept",b"text/html"), 70 (b"accept",b"application/xml") 71 ]) 72 73 Multiple headers are folded into a single header as per RFC 7230: 74 >>> h["Accept"] 75 "text/html, application/xml" 76 77 Setting a header removes all existing headers with the same name: 78 >>> h["Accept"] = "application/text" 79 >>> h["Accept"] 80 "application/text" 81 82 `bytes(h)` returns an HTTP/1 header block: 83 >>> print(bytes(h)) 84 Host: example.com 85 Accept: application/text 86 87 For full control, the raw header fields can be accessed: 88 >>> h.fields 89 90 Caveats: 91 - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`. 92 """ 93 94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 ) 119 120 fields: tuple[tuple[bytes, bytes], ...] 121 122 @staticmethod 123 def _reduce_values(values) -> str: 124 # Headers can be folded 125 return ", ".join(values) 126 127 @staticmethod 128 def _kconv(key) -> str: 129 # Headers are case-insensitive 130 return key.lower() 131 132 def __bytes__(self) -> bytes: 133 if self.fields: 134 return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" 135 else: 136 return b"" 137 138 def __delitem__(self, key: str | bytes) -> None: 139 key = _always_bytes(key) 140 super().__delitem__(key) 141 142 def __iter__(self) -> Iterator[str]: 143 for x in super().__iter__(): 144 yield _native(x) 145 146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)] 158 159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values) 167 168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value) 172 173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.
Create headers with keyword arguments:
>>> h = Headers(host="example.com", content_type="application/xml")
Headers mostly behave like a normal dict:
>>> h["Host"]
"example.com"
Headers are case insensitive:
>>> h["host"]
"example.com"
Headers can also be created from a list of raw (header_name, header_value) byte tuples:
>>> h = Headers([
(b"Host",b"example.com"),
(b"Accept",b"text/html"),
(b"accept",b"application/xml")
])
Multiple headers are folded into a single header as per RFC 7230:
>>> h["Accept"]
"text/html, application/xml"
Setting a header removes all existing headers with the same name:
>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"
bytes(h)
returns an HTTP/1 header block:
>>> print(bytes(h))
Host: example.com
Accept: application/text
For full control, the raw header fields can be accessed:
>>> h.fields
Caveats:
- For use with the "Set-Cookie" and "Cookie" headers, either use
Response.cookies
or seeHeaders.get_all
.
94 def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers): 95 """ 96 *Args:* 97 - *fields:* (optional) list of ``(name, value)`` header byte tuples, 98 e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. 99 - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`. 100 For convenience, underscores in header names will be transformed to dashes - 101 this behaviour does not extend to other methods. 102 103 If ``**headers`` contains multiple keys that have equal ``.lower()`` representations, 104 the behavior is undefined. 105 """ 106 super().__init__(fields) 107 108 for key, value in self.fields: 109 if not isinstance(key, bytes) or not isinstance(value, bytes): 110 raise TypeError("Header fields must be bytes.") 111 112 # content_type -> content-type 113 self.update( 114 { 115 _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) 116 for name, value in headers.items() 117 } 118 )
Args:
- fields: (optional) list of
(name, value)
header byte tuples, e.g.[(b"Host", b"example.com")]
. All names and values must be bytes. - **headers: Additional headers to set. Will overwrite existing values from
fields
. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.
If **headers
contains multiple keys that have equal .lower()
representations,
the behavior is undefined.
146 def get_all(self, name: str | bytes) -> list[str]: 147 """ 148 Like `Headers.get`, but does not fold multiple headers into a single one. 149 This is useful for Set-Cookie and Cookie headers, which do not support folding. 150 151 *See also:* 152 - <https://tools.ietf.org/html/rfc7230#section-3.2.2> 153 - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4> 154 - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5> 155 """ 156 name = _always_bytes(name) 157 return [_native(x) for x in super().get_all(name)]
Like Headers.get
, but does not fold multiple headers into a single one.
This is useful for Set-Cookie and Cookie headers, which do not support folding.
See also:
159 def set_all(self, name: str | bytes, values: Iterable[str | bytes]): 160 """ 161 Explicitly set multiple headers for the given key. 162 See `Headers.get_all`. 163 """ 164 name = _always_bytes(name) 165 values = [_always_bytes(x) for x in values] 166 return super().set_all(name, values)
Explicitly set multiple headers for the given key.
See Headers.get_all
.
168 def insert(self, index: int, key: str | bytes, value: str | bytes): 169 key = _always_bytes(key) 170 value = _always_bytes(value) 171 super().insert(index, key, value)
Insert an additional value for the given key at the specified position.
173 def items(self, multi=False): 174 if multi: 175 return ((_native(k), _native(v)) for k, v in self.fields) 176 else: 177 return super().items()
Get all (key, value) tuples.
If multi
is True, all (key, value)
pairs will be returned.
If False, only one tuple per key is returned.