Edit on GitHub

mitmproxy.http

   1import binascii
   2import json
   3import os
   4import time
   5import urllib.parse
   6import warnings
   7from collections.abc import Callable
   8from collections.abc import Iterable
   9from collections.abc import Iterator
  10from collections.abc import Mapping
  11from collections.abc import Sequence
  12from dataclasses import dataclass
  13from dataclasses import fields
  14from email.utils import formatdate
  15from email.utils import mktime_tz
  16from email.utils import parsedate_tz
  17from typing import Any
  18from typing import cast
  19
  20from mitmproxy import flow
  21from mitmproxy.coretypes import multidict
  22from mitmproxy.coretypes import serializable
  23from mitmproxy.net import encoding
  24from mitmproxy.net.http import cookies
  25from mitmproxy.net.http import multipart
  26from mitmproxy.net.http import status_codes
  27from mitmproxy.net.http import url
  28from mitmproxy.net.http.headers import assemble_content_type
  29from mitmproxy.net.http.headers import infer_content_encoding
  30from mitmproxy.net.http.headers import parse_content_type
  31from mitmproxy.utils import human
  32from mitmproxy.utils import strutils
  33from mitmproxy.utils import typecheck
  34from mitmproxy.utils.strutils import always_bytes
  35from mitmproxy.utils.strutils import always_str
  36from mitmproxy.websocket import WebSocketData
  37
  38
  39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
  40def _native(x: bytes) -> str:
  41    return x.decode("utf-8", "surrogateescape")
  42
  43
  44def _always_bytes(x: str | bytes) -> bytes:
  45    return strutils.always_bytes(x, "utf-8", "surrogateescape")
  46
  47
  48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types.
  49class Headers(multidict.MultiDict):  # type: ignore
  50    """
  51    Header class which allows both convenient access to individual headers as well as
  52    direct access to the underlying raw data. Provides a full dictionary interface.
  53
  54    Create headers with keyword arguments:
  55    >>> h = Headers(host="example.com", content_type="application/xml")
  56
  57    Headers mostly behave like a normal dict:
  58    >>> h["Host"]
  59    "example.com"
  60
  61    Headers are case insensitive:
  62    >>> h["host"]
  63    "example.com"
  64
  65    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
  66    >>> h = Headers([
  67        (b"Host",b"example.com"),
  68        (b"Accept",b"text/html"),
  69        (b"accept",b"application/xml")
  70    ])
  71
  72    Multiple headers are folded into a single header as per RFC 7230:
  73    >>> h["Accept"]
  74    "text/html, application/xml"
  75
  76    Setting a header removes all existing headers with the same name:
  77    >>> h["Accept"] = "application/text"
  78    >>> h["Accept"]
  79    "application/text"
  80
  81    `bytes(h)` returns an HTTP/1 header block:
  82    >>> print(bytes(h))
  83    Host: example.com
  84    Accept: application/text
  85
  86    For full control, the raw header fields can be accessed:
  87    >>> h.fields
  88
  89    Caveats:
  90     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
  91    """
  92
  93    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
  94        """
  95        *Args:*
  96         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
  97           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
  98         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
  99           For convenience, underscores in header names will be transformed to dashes -
 100           this behaviour does not extend to other methods.
 101
 102        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
 103        the behavior is undefined.
 104        """
 105        super().__init__(fields)
 106
 107        for key, value in self.fields:
 108            if not isinstance(key, bytes) or not isinstance(value, bytes):
 109                raise TypeError("Header fields must be bytes.")
 110
 111        # content_type -> content-type
 112        self.update(
 113            {
 114                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
 115                for name, value in headers.items()
 116            }
 117        )
 118
 119    fields: tuple[tuple[bytes, bytes], ...]
 120
 121    @staticmethod
 122    def _reduce_values(values) -> str:
 123        # Headers can be folded
 124        return ", ".join(values)
 125
 126    @staticmethod
 127    def _kconv(key) -> str:
 128        # Headers are case-insensitive
 129        return key.lower()
 130
 131    def __bytes__(self) -> bytes:
 132        if self.fields:
 133            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
 134        else:
 135            return b""
 136
 137    def __delitem__(self, key: str | bytes) -> None:
 138        key = _always_bytes(key)
 139        super().__delitem__(key)
 140
 141    def __iter__(self) -> Iterator[str]:
 142        for x in super().__iter__():
 143            yield _native(x)
 144
 145    def get_all(self, name: str | bytes) -> list[str]:
 146        """
 147        Like `Headers.get`, but does not fold multiple headers into a single one.
 148        This is useful for Set-Cookie and Cookie headers, which do not support folding.
 149
 150        *See also:*
 151         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
 152         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
 153         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
 154        """
 155        name = _always_bytes(name)
 156        return [_native(x) for x in super().get_all(name)]
 157
 158    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
 159        """
 160        Explicitly set multiple headers for the given key.
 161        See `Headers.get_all`.
 162        """
 163        name = _always_bytes(name)
 164        values = [_always_bytes(x) for x in values]
 165        return super().set_all(name, values)
 166
 167    def insert(self, index: int, key: str | bytes, value: str | bytes):
 168        key = _always_bytes(key)
 169        value = _always_bytes(value)
 170        super().insert(index, key, value)
 171
 172    def items(self, multi=False):
 173        if multi:
 174            return ((_native(k), _native(v)) for k, v in self.fields)
 175        else:
 176            return super().items()
 177
 178
 179@dataclass
 180class MessageData(serializable.Serializable):
 181    http_version: bytes
 182    headers: Headers
 183    content: bytes | None
 184    trailers: Headers | None
 185    timestamp_start: float
 186    timestamp_end: float | None
 187
 188    # noinspection PyUnreachableCode
 189    if __debug__:
 190
 191        def __post_init__(self):
 192            for field in fields(self):
 193                val = getattr(self, field.name)
 194                typecheck.check_option_type(field.name, val, field.type)
 195
 196    def set_state(self, state):
 197        for k, v in state.items():
 198            if k in ("headers", "trailers") and v is not None:
 199                v = Headers.from_state(v)
 200            setattr(self, k, v)
 201
 202    def get_state(self):
 203        state = vars(self).copy()
 204        state["headers"] = state["headers"].get_state()
 205        if state["trailers"] is not None:
 206            state["trailers"] = state["trailers"].get_state()
 207        return state
 208
 209    @classmethod
 210    def from_state(cls, state):
 211        state["headers"] = Headers.from_state(state["headers"])
 212        if state["trailers"] is not None:
 213            state["trailers"] = Headers.from_state(state["trailers"])
 214        return cls(**state)
 215
 216
 217@dataclass
 218class RequestData(MessageData):
 219    host: str
 220    port: int
 221    method: bytes
 222    scheme: bytes
 223    authority: bytes
 224    path: bytes
 225
 226
 227@dataclass
 228class ResponseData(MessageData):
 229    status_code: int
 230    reason: bytes
 231
 232
 233class Message(serializable.Serializable):
 234    """Base class for `Request` and `Response`."""
 235
 236    @classmethod
 237    def from_state(cls, state):
 238        return cls(**state)
 239
 240    def get_state(self):
 241        return self.data.get_state()
 242
 243    def set_state(self, state):
 244        self.data.set_state(state)
 245
 246    data: MessageData
 247    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
 248    """
 249    This attribute controls if the message body should be streamed.
 250
 251    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
 252    This makes it possible to perform string replacements on the entire body.
 253    If `True`, the message body will not be buffered on the proxy
 254    but immediately forwarded instead.
 255    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
 256    Please note that packet boundaries generally should not be relied upon.
 257
 258    This attribute must be set in the `requestheaders` or `responseheaders` hook.
 259    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
 260    """
 261
 262    @property
 263    def http_version(self) -> str:
 264        """
 265        HTTP version string, for example `HTTP/1.1`.
 266        """
 267        return self.data.http_version.decode("utf-8", "surrogateescape")
 268
 269    @http_version.setter
 270    def http_version(self, http_version: str | bytes) -> None:
 271        self.data.http_version = strutils.always_bytes(
 272            http_version, "utf-8", "surrogateescape"
 273        )
 274
 275    @property
 276    def is_http10(self) -> bool:
 277        return self.data.http_version == b"HTTP/1.0"
 278
 279    @property
 280    def is_http11(self) -> bool:
 281        return self.data.http_version == b"HTTP/1.1"
 282
 283    @property
 284    def is_http2(self) -> bool:
 285        return self.data.http_version == b"HTTP/2.0"
 286
 287    @property
 288    def is_http3(self) -> bool:
 289        return self.data.http_version == b"HTTP/3"
 290
 291    @property
 292    def headers(self) -> Headers:
 293        """
 294        The HTTP headers.
 295        """
 296        return self.data.headers
 297
 298    @headers.setter
 299    def headers(self, h: Headers) -> None:
 300        self.data.headers = h
 301
 302    @property
 303    def trailers(self) -> Headers | None:
 304        """
 305        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
 306        """
 307        return self.data.trailers
 308
 309    @trailers.setter
 310    def trailers(self, h: Headers | None) -> None:
 311        self.data.trailers = h
 312
 313    @property
 314    def raw_content(self) -> bytes | None:
 315        """
 316        The raw (potentially compressed) HTTP message body.
 317
 318        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
 319
 320        *See also:* `Message.content`, `Message.text`
 321        """
 322        return self.data.content
 323
 324    @raw_content.setter
 325    def raw_content(self, content: bytes | None) -> None:
 326        self.data.content = content
 327
 328    @property
 329    def content(self) -> bytes | None:
 330        """
 331        The uncompressed HTTP message body as bytes.
 332
 333        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
 334
 335        *See also:* `Message.raw_content`, `Message.text`
 336        """
 337        return self.get_content()
 338
 339    @content.setter
 340    def content(self, value: bytes | None) -> None:
 341        self.set_content(value)
 342
 343    @property
 344    def text(self) -> str | None:
 345        """
 346        The uncompressed and decoded HTTP message body as text.
 347
 348        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
 349
 350        *See also:* `Message.raw_content`, `Message.content`
 351        """
 352        return self.get_text()
 353
 354    @text.setter
 355    def text(self, value: str | None) -> None:
 356        self.set_text(value)
 357
 358    def set_content(self, value: bytes | None) -> None:
 359        if value is None:
 360            self.raw_content = None
 361            return
 362        if not isinstance(value, bytes):
 363            raise TypeError(
 364                f"Message content must be bytes, not {type(value).__name__}. "
 365                "Please use .text if you want to assign a str."
 366            )
 367        ce = self.headers.get("content-encoding")
 368        try:
 369            self.raw_content = encoding.encode(value, ce or "identity")
 370        except ValueError:
 371            # So we have an invalid content-encoding?
 372            # Let's remove it!
 373            del self.headers["content-encoding"]
 374            self.raw_content = value
 375
 376        if "transfer-encoding" in self.headers:
 377            # https://httpwg.org/specs/rfc7230.html#header.content-length
 378            # don't set content-length if a transfer-encoding is provided
 379            pass
 380        else:
 381            self.headers["content-length"] = str(len(self.raw_content))
 382
 383    def get_content(self, strict: bool = True) -> bytes | None:
 384        """
 385        Similar to `Message.content`, but does not raise if `strict` is `False`.
 386        Instead, the compressed message body is returned as-is.
 387        """
 388        if self.raw_content is None:
 389            return None
 390        ce = self.headers.get("content-encoding")
 391        if ce:
 392            try:
 393                content = encoding.decode(self.raw_content, ce)
 394                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
 395                if isinstance(content, str):
 396                    raise ValueError(f"Invalid Content-Encoding: {ce}")
 397                return content
 398            except ValueError:
 399                if strict:
 400                    raise
 401                return self.raw_content
 402        else:
 403            return self.raw_content
 404
 405    def set_text(self, text: str | None) -> None:
 406        if text is None:
 407            self.content = None
 408            return
 409        enc = infer_content_encoding(self.headers.get("content-type", ""))
 410
 411        try:
 412            self.content = cast(bytes, encoding.encode(text, enc))
 413        except ValueError:
 414            # Fall back to UTF-8 and update the content-type header.
 415            ct = parse_content_type(self.headers.get("content-type", "")) or (
 416                "text",
 417                "plain",
 418                {},
 419            )
 420            ct[2]["charset"] = "utf-8"
 421            self.headers["content-type"] = assemble_content_type(*ct)
 422            enc = "utf8"
 423            self.content = text.encode(enc, "surrogateescape")
 424
 425    def get_text(self, strict: bool = True) -> str | None:
 426        """
 427        Similar to `Message.text`, but does not raise if `strict` is `False`.
 428        Instead, the message body is returned as surrogate-escaped UTF-8.
 429        """
 430        content = self.get_content(strict)
 431        if content is None:
 432            return None
 433        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
 434        try:
 435            return cast(str, encoding.decode(content, enc))
 436        except ValueError:
 437            if strict:
 438                raise
 439            return content.decode("utf8", "surrogateescape")
 440
 441    @property
 442    def timestamp_start(self) -> float:
 443        """
 444        *Timestamp:* Headers received.
 445        """
 446        return self.data.timestamp_start
 447
 448    @timestamp_start.setter
 449    def timestamp_start(self, timestamp_start: float) -> None:
 450        self.data.timestamp_start = timestamp_start
 451
 452    @property
 453    def timestamp_end(self) -> float | None:
 454        """
 455        *Timestamp:* Last byte received.
 456        """
 457        return self.data.timestamp_end
 458
 459    @timestamp_end.setter
 460    def timestamp_end(self, timestamp_end: float | None):
 461        self.data.timestamp_end = timestamp_end
 462
 463    def decode(self, strict: bool = True) -> None:
 464        """
 465        Decodes body based on the current Content-Encoding header, then
 466        removes the header. If there is no Content-Encoding header, no
 467        action is taken.
 468
 469        *Raises:*
 470         - `ValueError`, when the content-encoding is invalid and strict is True.
 471        """
 472        decoded = self.get_content(strict)
 473        self.headers.pop("content-encoding", None)
 474        self.content = decoded
 475
 476    def encode(self, encoding: str) -> None:
 477        """
 478        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
 479        Any existing content-encodings are overwritten, the content is not decoded beforehand.
 480
 481        *Raises:*
 482         - `ValueError`, when the specified content-encoding is invalid.
 483        """
 484        self.headers["content-encoding"] = encoding
 485        self.content = self.raw_content
 486        if "content-encoding" not in self.headers:
 487            raise ValueError(f"Invalid content encoding {repr(encoding)}")
 488
 489    def json(self, **kwargs: Any) -> Any:
 490        """
 491        Returns the JSON encoded content of the response, if any.
 492        `**kwargs` are optional arguments that will be
 493        passed to `json.loads()`.
 494
 495        Will raise if the content can not be decoded and then parsed as JSON.
 496
 497        *Raises:*
 498         - `json.decoder.JSONDecodeError` if content is not valid JSON.
 499         - `TypeError` if the content is not available, for example because the response
 500            has been streamed.
 501        """
 502        content = self.get_content(strict=False)
 503        if content is None:
 504            raise TypeError("Message content is not available.")
 505        else:
 506            return json.loads(content, **kwargs)
 507
 508
 509class Request(Message):
 510    """
 511    An HTTP request.
 512    """
 513
 514    data: RequestData
 515
 516    def __init__(
 517        self,
 518        host: str,
 519        port: int,
 520        method: bytes,
 521        scheme: bytes,
 522        authority: bytes,
 523        path: bytes,
 524        http_version: bytes,
 525        headers: Headers | tuple[tuple[bytes, bytes], ...],
 526        content: bytes | None,
 527        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 528        timestamp_start: float,
 529        timestamp_end: float | None,
 530    ):
 531        # auto-convert invalid types to retain compatibility with older code.
 532        if isinstance(host, bytes):
 533            host = host.decode("idna", "strict")
 534        if isinstance(method, str):
 535            method = method.encode("ascii", "strict")
 536        if isinstance(scheme, str):
 537            scheme = scheme.encode("ascii", "strict")
 538        if isinstance(authority, str):
 539            authority = authority.encode("ascii", "strict")
 540        if isinstance(path, str):
 541            path = path.encode("ascii", "strict")
 542        if isinstance(http_version, str):
 543            http_version = http_version.encode("ascii", "strict")
 544
 545        if isinstance(content, str):
 546            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 547        if not isinstance(headers, Headers):
 548            headers = Headers(headers)
 549        if trailers is not None and not isinstance(trailers, Headers):
 550            trailers = Headers(trailers)
 551
 552        self.data = RequestData(
 553            host=host,
 554            port=port,
 555            method=method,
 556            scheme=scheme,
 557            authority=authority,
 558            path=path,
 559            http_version=http_version,
 560            headers=headers,
 561            content=content,
 562            trailers=trailers,
 563            timestamp_start=timestamp_start,
 564            timestamp_end=timestamp_end,
 565        )
 566
 567    def __repr__(self) -> str:
 568        if self.host and self.port:
 569            hostport = f"{self.host}:{self.port}"
 570        else:
 571            hostport = ""
 572        path = self.path or ""
 573        return f"Request({self.method} {hostport}{path})"
 574
 575    @classmethod
 576    def make(
 577        cls,
 578        method: str,
 579        url: str,
 580        content: bytes | str = "",
 581        headers: (
 582            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 583        ) = (),
 584    ) -> "Request":
 585        """
 586        Simplified API for creating request objects.
 587        """
 588        # Headers can be list or dict, we differentiate here.
 589        if isinstance(headers, Headers):
 590            pass
 591        elif isinstance(headers, dict):
 592            headers = Headers(
 593                (
 594                    always_bytes(k, "utf-8", "surrogateescape"),
 595                    always_bytes(v, "utf-8", "surrogateescape"),
 596                )
 597                for k, v in headers.items()
 598            )
 599        elif isinstance(headers, Iterable):
 600            headers = Headers(headers)  # type: ignore
 601        else:
 602            raise TypeError(
 603                "Expected headers to be an iterable or dict, but is {}.".format(
 604                    type(headers).__name__
 605                )
 606            )
 607
 608        req = cls(
 609            "",
 610            0,
 611            method.encode("utf-8", "surrogateescape"),
 612            b"",
 613            b"",
 614            b"",
 615            b"HTTP/1.1",
 616            headers,
 617            b"",
 618            None,
 619            time.time(),
 620            time.time(),
 621        )
 622
 623        req.url = url
 624        # Assign this manually to update the content-length header.
 625        if isinstance(content, bytes):
 626            req.content = content
 627        elif isinstance(content, str):
 628            req.text = content
 629        else:
 630            raise TypeError(
 631                f"Expected content to be str or bytes, but is {type(content).__name__}."
 632            )
 633
 634        return req
 635
 636    @property
 637    def first_line_format(self) -> str:
 638        """
 639        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 640
 641        origin-form and asterisk-form are subsumed as "relative".
 642        """
 643        if self.method == "CONNECT":
 644            return "authority"
 645        elif self.authority:
 646            return "absolute"
 647        else:
 648            return "relative"
 649
 650    @property
 651    def method(self) -> str:
 652        """
 653        HTTP request method, e.g. "GET".
 654        """
 655        return self.data.method.decode("utf-8", "surrogateescape").upper()
 656
 657    @method.setter
 658    def method(self, val: str | bytes) -> None:
 659        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 660
 661    @property
 662    def scheme(self) -> str:
 663        """
 664        HTTP request scheme, which should be "http" or "https".
 665        """
 666        return self.data.scheme.decode("utf-8", "surrogateescape")
 667
 668    @scheme.setter
 669    def scheme(self, val: str | bytes) -> None:
 670        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 671
 672    @property
 673    def authority(self) -> str:
 674        """
 675        HTTP request authority.
 676
 677        For HTTP/1, this is the authority portion of the request target
 678        (in either absolute-form or authority-form).
 679        For origin-form and asterisk-form requests, this property is set to an empty string.
 680
 681        For HTTP/2, this is the :authority pseudo header.
 682
 683        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 684        """
 685        try:
 686            return self.data.authority.decode("idna")
 687        except UnicodeError:
 688            return self.data.authority.decode("utf8", "surrogateescape")
 689
 690    @authority.setter
 691    def authority(self, val: str | bytes) -> None:
 692        if isinstance(val, str):
 693            try:
 694                val = val.encode("idna", "strict")
 695            except UnicodeError:
 696                val = val.encode("utf8", "surrogateescape")  # type: ignore
 697        self.data.authority = val
 698
 699    @property
 700    def host(self) -> str:
 701        """
 702        Target server for this request. This may be parsed from the raw request
 703        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 704        or inferred from the proxy mode (e.g. an IP in transparent mode).
 705
 706        Setting the host attribute also updates the host header and authority information, if present.
 707
 708        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 709        """
 710        return self.data.host
 711
 712    @host.setter
 713    def host(self, val: str | bytes) -> None:
 714        self.data.host = always_str(val, "idna", "strict")
 715        self._update_host_and_authority()
 716
 717    @property
 718    def host_header(self) -> str | None:
 719        """
 720        The request's host/authority header.
 721
 722        This property maps to either ``request.headers["Host"]`` or
 723        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 724
 725        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 726        """
 727        if self.is_http2 or self.is_http3:
 728            return self.authority or self.data.headers.get("Host", None)
 729        else:
 730            return self.data.headers.get("Host", None)
 731
 732    @host_header.setter
 733    def host_header(self, val: None | str | bytes) -> None:
 734        if val is None:
 735            if self.is_http2 or self.is_http3:
 736                self.data.authority = b""
 737            self.headers.pop("Host", None)
 738        else:
 739            if self.is_http2 or self.is_http3:
 740                self.authority = val  # type: ignore
 741            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 742                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 743                self.headers["Host"] = val
 744
 745    @property
 746    def port(self) -> int:
 747        """
 748        Target port.
 749        """
 750        return self.data.port
 751
 752    @port.setter
 753    def port(self, port: int) -> None:
 754        if not isinstance(port, int):
 755            raise ValueError(f"Port must be an integer, not {port!r}.")
 756
 757        self.data.port = port
 758        self._update_host_and_authority()
 759
 760    def _update_host_and_authority(self) -> None:
 761        val = url.hostport(self.scheme, self.host, self.port)
 762
 763        # Update host header
 764        if "Host" in self.data.headers:
 765            self.data.headers["Host"] = val
 766        # Update authority
 767        if self.data.authority:
 768            self.authority = val
 769
 770    @property
 771    def path(self) -> str:
 772        """
 773        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 774        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 775
 776        This attribute includes both path and query parts of the target URI
 777        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 778        """
 779        return self.data.path.decode("utf-8", "surrogateescape")
 780
 781    @path.setter
 782    def path(self, val: str | bytes) -> None:
 783        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 784
 785    @property
 786    def url(self) -> str:
 787        """
 788        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 789
 790        Settings this property updates these attributes as well.
 791        """
 792        if self.first_line_format == "authority":
 793            return f"{self.host}:{self.port}"
 794        path = self.path if self.path != "*" else ""
 795        return url.unparse(self.scheme, self.host, self.port, path)
 796
 797    @url.setter
 798    def url(self, val: str | bytes) -> None:
 799        val = always_str(val, "utf-8", "surrogateescape")
 800        self.scheme, self.host, self.port, self.path = url.parse(val)  # type: ignore
 801
 802    @property
 803    def pretty_host(self) -> str:
 804        """
 805        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 806        This is useful in transparent mode where `Request.host` is only an IP address.
 807
 808        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 809        as the Host header could be spoofed.
 810        """
 811        authority = self.host_header
 812        if authority:
 813            return url.parse_authority(authority, check=False)[0]
 814        else:
 815            return self.host
 816
 817    @property
 818    def pretty_url(self) -> str:
 819        """
 820        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 821        """
 822        if self.first_line_format == "authority":
 823            return self.authority
 824
 825        host_header = self.host_header
 826        if not host_header:
 827            return self.url
 828
 829        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 830        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 831        path = self.path if self.path != "*" else ""
 832
 833        return url.unparse(self.scheme, pretty_host, pretty_port, path)
 834
 835    def _get_query(self):
 836        query = urllib.parse.urlparse(self.url).query
 837        return tuple(url.decode(query))
 838
 839    def _set_query(self, query_data):
 840        query = url.encode(query_data)
 841        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 842        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 843
 844    @property
 845    def query(self) -> multidict.MultiDictView[str, str]:
 846        """
 847        The request query as a mutable mapping view on the request's path.
 848        For the most part, this behaves like a dictionary.
 849        Modifications to the MultiDictView update `Request.path`, and vice versa.
 850        """
 851        return multidict.MultiDictView(self._get_query, self._set_query)
 852
 853    @query.setter
 854    def query(self, value):
 855        self._set_query(value)
 856
 857    def _get_cookies(self):
 858        h = self.headers.get_all("Cookie")
 859        return tuple(cookies.parse_cookie_headers(h))
 860
 861    def _set_cookies(self, value):
 862        self.headers["cookie"] = cookies.format_cookie_header(value)
 863
 864    @property
 865    def cookies(self) -> multidict.MultiDictView[str, str]:
 866        """
 867        The request cookies.
 868        For the most part, this behaves like a dictionary.
 869        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 870        """
 871        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 872
 873    @cookies.setter
 874    def cookies(self, value):
 875        self._set_cookies(value)
 876
 877    @property
 878    def path_components(self) -> tuple[str, ...]:
 879        """
 880        The URL's path components as a tuple of strings.
 881        Components are unquoted.
 882        """
 883        path = urllib.parse.urlparse(self.url).path
 884        # This needs to be a tuple so that it's immutable.
 885        # Otherwise, this would fail silently:
 886        #   request.path_components.append("foo")
 887        return tuple(url.unquote(i) for i in path.split("/") if i)
 888
 889    @path_components.setter
 890    def path_components(self, components: Iterable[str]):
 891        components = map(lambda x: url.quote(x, safe=""), components)
 892        path = "/" + "/".join(components)
 893        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 894        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 895
 896    def anticache(self) -> None:
 897        """
 898        Modifies this request to remove headers that might produce a cached response.
 899        """
 900        delheaders = (
 901            "if-modified-since",
 902            "if-none-match",
 903        )
 904        for i in delheaders:
 905            self.headers.pop(i, None)
 906
 907    def anticomp(self) -> None:
 908        """
 909        Modify the Accept-Encoding header to only accept uncompressed responses.
 910        """
 911        self.headers["accept-encoding"] = "identity"
 912
 913    def constrain_encoding(self) -> None:
 914        """
 915        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 916        """
 917        accept_encoding = self.headers.get("accept-encoding")
 918        if accept_encoding:
 919            self.headers["accept-encoding"] = ", ".join(
 920                e
 921                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 922                if e in accept_encoding
 923            )
 924
 925    def _get_urlencoded_form(self):
 926        is_valid_content_type = (
 927            "application/x-www-form-urlencoded"
 928            in self.headers.get("content-type", "").lower()
 929        )
 930        if is_valid_content_type:
 931            return tuple(url.decode(self.get_text(strict=False)))
 932        return ()
 933
 934    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 935        """
 936        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 937        This will overwrite the existing content if there is one.
 938        """
 939        self.headers["content-type"] = "application/x-www-form-urlencoded"
 940        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 941
 942    @property
 943    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 944        """
 945        The URL-encoded form data.
 946
 947        If the content-type indicates non-form data or the form could not be parsed, this is set to
 948        an empty `MultiDictView`.
 949
 950        Modifications to the MultiDictView update `Request.content`, and vice versa.
 951        """
 952        return multidict.MultiDictView(
 953            self._get_urlencoded_form, self._set_urlencoded_form
 954        )
 955
 956    @urlencoded_form.setter
 957    def urlencoded_form(self, value):
 958        self._set_urlencoded_form(value)
 959
 960    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 961        is_valid_content_type = (
 962            "multipart/form-data" in self.headers.get("content-type", "").lower()
 963        )
 964        if is_valid_content_type and self.content is not None:
 965            try:
 966                return multipart.decode_multipart(
 967                    self.headers.get("content-type"), self.content
 968                )
 969            except ValueError:
 970                pass
 971        return []
 972
 973    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 974        ct = self.headers.get("content-type", "")
 975        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 976        if not is_valid_content_type:
 977            """
 978            Generate a random boundary here.
 979
 980            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 981            on generating the boundary.
 982            """
 983            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 984            self.headers["content-type"] = ct = (
 985                f"multipart/form-data; boundary={boundary}"
 986            )
 987        self.content = multipart.encode_multipart(ct, value)
 988
 989    @property
 990    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 991        """
 992        The multipart form data.
 993
 994        If the content-type indicates non-form data or the form could not be parsed, this is set to
 995        an empty `MultiDictView`.
 996
 997        Modifications to the MultiDictView update `Request.content`, and vice versa.
 998        """
 999        return multidict.MultiDictView(
1000            self._get_multipart_form, self._set_multipart_form
1001        )
1002
1003    @multipart_form.setter
1004    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1005        self._set_multipart_form(value)
1006
1007
1008class Response(Message):
1009    """
1010    An HTTP response.
1011    """
1012
1013    data: ResponseData
1014
1015    def __init__(
1016        self,
1017        http_version: bytes,
1018        status_code: int,
1019        reason: bytes,
1020        headers: Headers | tuple[tuple[bytes, bytes], ...],
1021        content: bytes | None,
1022        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1023        timestamp_start: float,
1024        timestamp_end: float | None,
1025    ):
1026        # auto-convert invalid types to retain compatibility with older code.
1027        if isinstance(http_version, str):
1028            http_version = http_version.encode("ascii", "strict")
1029        if isinstance(reason, str):
1030            reason = reason.encode("ascii", "strict")
1031
1032        if isinstance(content, str):
1033            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1034        if not isinstance(headers, Headers):
1035            headers = Headers(headers)
1036        if trailers is not None and not isinstance(trailers, Headers):
1037            trailers = Headers(trailers)
1038
1039        self.data = ResponseData(
1040            http_version=http_version,
1041            status_code=status_code,
1042            reason=reason,
1043            headers=headers,
1044            content=content,
1045            trailers=trailers,
1046            timestamp_start=timestamp_start,
1047            timestamp_end=timestamp_end,
1048        )
1049
1050    def __repr__(self) -> str:
1051        if self.raw_content:
1052            ct = self.headers.get("content-type", "unknown content type")
1053            size = human.pretty_size(len(self.raw_content))
1054            details = f"{ct}, {size}"
1055        else:
1056            details = "no content"
1057        return f"Response({self.status_code}, {details})"
1058
1059    @classmethod
1060    def make(
1061        cls,
1062        status_code: int = 200,
1063        content: bytes | str = b"",
1064        headers: (
1065            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1066        ) = (),
1067    ) -> "Response":
1068        """
1069        Simplified API for creating response objects.
1070        """
1071        if isinstance(headers, Headers):
1072            headers = headers
1073        elif isinstance(headers, dict):
1074            headers = Headers(
1075                (
1076                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1077                    always_bytes(v, "utf-8", "surrogateescape"),
1078                )
1079                for k, v in headers.items()
1080            )
1081        elif isinstance(headers, Iterable):
1082            headers = Headers(headers)  # type: ignore
1083        else:
1084            raise TypeError(
1085                "Expected headers to be an iterable or dict, but is {}.".format(
1086                    type(headers).__name__
1087                )
1088            )
1089
1090        resp = cls(
1091            b"HTTP/1.1",
1092            status_code,
1093            status_codes.RESPONSES.get(status_code, "").encode(),
1094            headers,
1095            None,
1096            None,
1097            time.time(),
1098            time.time(),
1099        )
1100
1101        # Assign this manually to update the content-length header.
1102        if isinstance(content, bytes):
1103            resp.content = content
1104        elif isinstance(content, str):
1105            resp.text = content
1106        else:
1107            raise TypeError(
1108                f"Expected content to be str or bytes, but is {type(content).__name__}."
1109            )
1110
1111        return resp
1112
1113    @property
1114    def status_code(self) -> int:
1115        """
1116        HTTP Status Code, e.g. ``200``.
1117        """
1118        return self.data.status_code
1119
1120    @status_code.setter
1121    def status_code(self, status_code: int) -> None:
1122        self.data.status_code = status_code
1123
1124    @property
1125    def reason(self) -> str:
1126        """
1127        HTTP reason phrase, for example "Not Found".
1128
1129        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1130        """
1131        # Encoding: http://stackoverflow.com/a/16674906/934719
1132        return self.data.reason.decode("ISO-8859-1")
1133
1134    @reason.setter
1135    def reason(self, reason: str | bytes) -> None:
1136        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1137
1138    def _get_cookies(self):
1139        h = self.headers.get_all("set-cookie")
1140        all_cookies = cookies.parse_set_cookie_headers(h)
1141        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1142
1143    def _set_cookies(self, value):
1144        cookie_headers = []
1145        for k, v in value:
1146            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1147            cookie_headers.append(header)
1148        self.headers.set_all("set-cookie", cookie_headers)
1149
1150    @property
1151    def cookies(
1152        self,
1153    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1154        """
1155        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1156        name strings, and values are `(cookie value, attributes)` tuples. Within
1157        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1158        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1159
1160        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1161        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1162        """
1163        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1164
1165    @cookies.setter
1166    def cookies(self, value):
1167        self._set_cookies(value)
1168
1169    def refresh(self, now=None):
1170        """
1171        This fairly complex and heuristic function refreshes a server
1172        response for replay.
1173
1174         - It adjusts date, expires, and last-modified headers.
1175         - It adjusts cookie expiration.
1176        """
1177        if not now:
1178            now = time.time()
1179        delta = now - self.timestamp_start
1180        refresh_headers = [
1181            "date",
1182            "expires",
1183            "last-modified",
1184        ]
1185        for i in refresh_headers:
1186            if i in self.headers:
1187                d = parsedate_tz(self.headers[i])
1188                if d:
1189                    new = mktime_tz(d) + delta
1190                    try:
1191                        self.headers[i] = formatdate(new, usegmt=True)
1192                    except OSError:  # pragma: no cover
1193                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1194        c = []
1195        for set_cookie_header in self.headers.get_all("set-cookie"):
1196            try:
1197                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1198            except ValueError:
1199                refreshed = set_cookie_header
1200            c.append(refreshed)
1201        if c:
1202            self.headers.set_all("set-cookie", c)
1203
1204
1205class HTTPFlow(flow.Flow):
1206    """
1207    An HTTPFlow is a collection of objects representing a single HTTP
1208    transaction.
1209    """
1210
1211    request: Request
1212    """The client's HTTP request."""
1213    response: Response | None = None
1214    """The server's HTTP response."""
1215    error: flow.Error | None = None
1216    """
1217    A connection or protocol error affecting this flow.
1218
1219    Note that it's possible for a Flow to have both a response and an error
1220    object. This might happen, for instance, when a response was received
1221    from the server, but there was an error sending it back to the client.
1222    """
1223
1224    websocket: WebSocketData | None = None
1225    """
1226    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1227    """
1228
1229    def get_state(self) -> serializable.State:
1230        return {
1231            **super().get_state(),
1232            "request": self.request.get_state(),
1233            "response": self.response.get_state() if self.response else None,
1234            "websocket": self.websocket.get_state() if self.websocket else None,
1235        }
1236
1237    def set_state(self, state: serializable.State) -> None:
1238        self.request = Request.from_state(state.pop("request"))
1239        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1240        self.websocket = (
1241            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1242        )
1243        super().set_state(state)
1244
1245    def __repr__(self):
1246        s = "<HTTPFlow"
1247        for a in (
1248            "request",
1249            "response",
1250            "websocket",
1251            "error",
1252            "client_conn",
1253            "server_conn",
1254        ):
1255            if getattr(self, a, False):
1256                s += f"\r\n  {a} = {{flow.{a}}}"
1257        s += ">"
1258        return s.format(flow=self)
1259
1260    @property
1261    def timestamp_start(self) -> float:
1262        """*Read-only:* An alias for `Request.timestamp_start`."""
1263        return self.request.timestamp_start
1264
1265    @property
1266    def mode(self) -> str:  # pragma: no cover
1267        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1268        return getattr(self, "_mode", "regular")
1269
1270    @mode.setter
1271    def mode(self, val: str) -> None:  # pragma: no cover
1272        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1273        self._mode = val
1274
1275    def copy(self):
1276        f = super().copy()
1277        if self.request:
1278            f.request = self.request.copy()
1279        if self.response:
1280            f.response = self.response.copy()
1281        return f
1282
1283
1284__all__ = [
1285    "HTTPFlow",
1286    "Message",
1287    "Request",
1288    "Response",
1289    "Headers",
1290]
class HTTPFlow(mitmproxy.flow.Flow):
1206class HTTPFlow(flow.Flow):
1207    """
1208    An HTTPFlow is a collection of objects representing a single HTTP
1209    transaction.
1210    """
1211
1212    request: Request
1213    """The client's HTTP request."""
1214    response: Response | None = None
1215    """The server's HTTP response."""
1216    error: flow.Error | None = None
1217    """
1218    A connection or protocol error affecting this flow.
1219
1220    Note that it's possible for a Flow to have both a response and an error
1221    object. This might happen, for instance, when a response was received
1222    from the server, but there was an error sending it back to the client.
1223    """
1224
1225    websocket: WebSocketData | None = None
1226    """
1227    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1228    """
1229
1230    def get_state(self) -> serializable.State:
1231        return {
1232            **super().get_state(),
1233            "request": self.request.get_state(),
1234            "response": self.response.get_state() if self.response else None,
1235            "websocket": self.websocket.get_state() if self.websocket else None,
1236        }
1237
1238    def set_state(self, state: serializable.State) -> None:
1239        self.request = Request.from_state(state.pop("request"))
1240        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1241        self.websocket = (
1242            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1243        )
1244        super().set_state(state)
1245
1246    def __repr__(self):
1247        s = "<HTTPFlow"
1248        for a in (
1249            "request",
1250            "response",
1251            "websocket",
1252            "error",
1253            "client_conn",
1254            "server_conn",
1255        ):
1256            if getattr(self, a, False):
1257                s += f"\r\n  {a} = {{flow.{a}}}"
1258        s += ">"
1259        return s.format(flow=self)
1260
1261    @property
1262    def timestamp_start(self) -> float:
1263        """*Read-only:* An alias for `Request.timestamp_start`."""
1264        return self.request.timestamp_start
1265
1266    @property
1267    def mode(self) -> str:  # pragma: no cover
1268        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1269        return getattr(self, "_mode", "regular")
1270
1271    @mode.setter
1272    def mode(self, val: str) -> None:  # pragma: no cover
1273        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1274        self._mode = val
1275
1276    def copy(self):
1277        f = super().copy()
1278        if self.request:
1279            f.request = self.request.copy()
1280        if self.response:
1281            f.response = self.response.copy()
1282        return f

An HTTPFlow is a collection of objects representing a single HTTP transaction.

request: Request

The client's HTTP request.

response: Response | None = None

The server's HTTP response.

error: mitmproxy.flow.Error | None = None

A connection or protocol error affecting this flow.

Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.

websocket: mitmproxy.websocket.WebSocketData | None = None

If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.

timestamp_start: float
1261    @property
1262    def timestamp_start(self) -> float:
1263        """*Read-only:* An alias for `Request.timestamp_start`."""
1264        return self.request.timestamp_start

Read-only: An alias for Request.timestamp_start.

mode: str
1266    @property
1267    def mode(self) -> str:  # pragma: no cover
1268        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1269        return getattr(self, "_mode", "regular")
def copy(self):
1276    def copy(self):
1277        f = super().copy()
1278        if self.request:
1279            f.request = self.request.copy()
1280        if self.response:
1281            f.response = self.response.copy()
1282        return f

Make a copy of this flow.

type: ClassVar[str] = 'http'

The flow type, for example http, tcp, or dns.

class Message(mitmproxy.coretypes.serializable.Serializable):
234class Message(serializable.Serializable):
235    """Base class for `Request` and `Response`."""
236
237    @classmethod
238    def from_state(cls, state):
239        return cls(**state)
240
241    def get_state(self):
242        return self.data.get_state()
243
244    def set_state(self, state):
245        self.data.set_state(state)
246
247    data: MessageData
248    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
249    """
250    This attribute controls if the message body should be streamed.
251
252    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
253    This makes it possible to perform string replacements on the entire body.
254    If `True`, the message body will not be buffered on the proxy
255    but immediately forwarded instead.
256    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
257    Please note that packet boundaries generally should not be relied upon.
258
259    This attribute must be set in the `requestheaders` or `responseheaders` hook.
260    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
261    """
262
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")
269
270    @http_version.setter
271    def http_version(self, http_version: str | bytes) -> None:
272        self.data.http_version = strutils.always_bytes(
273            http_version, "utf-8", "surrogateescape"
274        )
275
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
279
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
283
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
287
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
291
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers
298
299    @headers.setter
300    def headers(self, h: Headers) -> None:
301        self.data.headers = h
302
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
309
310    @trailers.setter
311    def trailers(self, h: Headers | None) -> None:
312        self.data.trailers = h
313
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320
321        *See also:* `Message.content`, `Message.text`
322        """
323        return self.data.content
324
325    @raw_content.setter
326    def raw_content(self, content: bytes | None) -> None:
327        self.data.content = content
328
329    @property
330    def content(self) -> bytes | None:
331        """
332        The uncompressed HTTP message body as bytes.
333
334        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
335
336        *See also:* `Message.raw_content`, `Message.text`
337        """
338        return self.get_content()
339
340    @content.setter
341    def content(self, value: bytes | None) -> None:
342        self.set_content(value)
343
344    @property
345    def text(self) -> str | None:
346        """
347        The uncompressed and decoded HTTP message body as text.
348
349        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
350
351        *See also:* `Message.raw_content`, `Message.content`
352        """
353        return self.get_text()
354
355    @text.setter
356    def text(self, value: str | None) -> None:
357        self.set_text(value)
358
359    def set_content(self, value: bytes | None) -> None:
360        if value is None:
361            self.raw_content = None
362            return
363        if not isinstance(value, bytes):
364            raise TypeError(
365                f"Message content must be bytes, not {type(value).__name__}. "
366                "Please use .text if you want to assign a str."
367            )
368        ce = self.headers.get("content-encoding")
369        try:
370            self.raw_content = encoding.encode(value, ce or "identity")
371        except ValueError:
372            # So we have an invalid content-encoding?
373            # Let's remove it!
374            del self.headers["content-encoding"]
375            self.raw_content = value
376
377        if "transfer-encoding" in self.headers:
378            # https://httpwg.org/specs/rfc7230.html#header.content-length
379            # don't set content-length if a transfer-encoding is provided
380            pass
381        else:
382            self.headers["content-length"] = str(len(self.raw_content))
383
384    def get_content(self, strict: bool = True) -> bytes | None:
385        """
386        Similar to `Message.content`, but does not raise if `strict` is `False`.
387        Instead, the compressed message body is returned as-is.
388        """
389        if self.raw_content is None:
390            return None
391        ce = self.headers.get("content-encoding")
392        if ce:
393            try:
394                content = encoding.decode(self.raw_content, ce)
395                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
396                if isinstance(content, str):
397                    raise ValueError(f"Invalid Content-Encoding: {ce}")
398                return content
399            except ValueError:
400                if strict:
401                    raise
402                return self.raw_content
403        else:
404            return self.raw_content
405
406    def set_text(self, text: str | None) -> None:
407        if text is None:
408            self.content = None
409            return
410        enc = infer_content_encoding(self.headers.get("content-type", ""))
411
412        try:
413            self.content = cast(bytes, encoding.encode(text, enc))
414        except ValueError:
415            # Fall back to UTF-8 and update the content-type header.
416            ct = parse_content_type(self.headers.get("content-type", "")) or (
417                "text",
418                "plain",
419                {},
420            )
421            ct[2]["charset"] = "utf-8"
422            self.headers["content-type"] = assemble_content_type(*ct)
423            enc = "utf8"
424            self.content = text.encode(enc, "surrogateescape")
425
426    def get_text(self, strict: bool = True) -> str | None:
427        """
428        Similar to `Message.text`, but does not raise if `strict` is `False`.
429        Instead, the message body is returned as surrogate-escaped UTF-8.
430        """
431        content = self.get_content(strict)
432        if content is None:
433            return None
434        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
435        try:
436            return cast(str, encoding.decode(content, enc))
437        except ValueError:
438            if strict:
439                raise
440            return content.decode("utf8", "surrogateescape")
441
442    @property
443    def timestamp_start(self) -> float:
444        """
445        *Timestamp:* Headers received.
446        """
447        return self.data.timestamp_start
448
449    @timestamp_start.setter
450    def timestamp_start(self, timestamp_start: float) -> None:
451        self.data.timestamp_start = timestamp_start
452
453    @property
454    def timestamp_end(self) -> float | None:
455        """
456        *Timestamp:* Last byte received.
457        """
458        return self.data.timestamp_end
459
460    @timestamp_end.setter
461    def timestamp_end(self, timestamp_end: float | None):
462        self.data.timestamp_end = timestamp_end
463
464    def decode(self, strict: bool = True) -> None:
465        """
466        Decodes body based on the current Content-Encoding header, then
467        removes the header. If there is no Content-Encoding header, no
468        action is taken.
469
470        *Raises:*
471         - `ValueError`, when the content-encoding is invalid and strict is True.
472        """
473        decoded = self.get_content(strict)
474        self.headers.pop("content-encoding", None)
475        self.content = decoded
476
477    def encode(self, encoding: str) -> None:
478        """
479        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
480        Any existing content-encodings are overwritten, the content is not decoded beforehand.
481
482        *Raises:*
483         - `ValueError`, when the specified content-encoding is invalid.
484        """
485        self.headers["content-encoding"] = encoding
486        self.content = self.raw_content
487        if "content-encoding" not in self.headers:
488            raise ValueError(f"Invalid content encoding {repr(encoding)}")
489
490    def json(self, **kwargs: Any) -> Any:
491        """
492        Returns the JSON encoded content of the response, if any.
493        `**kwargs` are optional arguments that will be
494        passed to `json.loads()`.
495
496        Will raise if the content can not be decoded and then parsed as JSON.
497
498        *Raises:*
499         - `json.decoder.JSONDecodeError` if content is not valid JSON.
500         - `TypeError` if the content is not available, for example because the response
501            has been streamed.
502        """
503        content = self.get_content(strict=False)
504        if content is None:
505            raise TypeError("Message content is not available.")
506        else:
507            return json.loads(content, **kwargs)

Base class for Request and Response.

stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False

This attribute controls if the message body should be streamed.

If False, mitmproxy will buffer the entire body before forwarding it to the destination. This makes it possible to perform string replacements on the entire body. If True, the message body will not be buffered on the proxy but immediately forwarded instead. Alternatively, a transformation function can be specified, which will be called for each chunk of data. Please note that packet boundaries generally should not be relied upon.

This attribute must be set in the requestheaders or responseheaders hook. Setting it in request or response is already too late, mitmproxy has buffered the message body already.

http_version: str
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")

HTTP version string, for example HTTP/1.1.

is_http10: bool
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
is_http11: bool
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
is_http2: bool
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
is_http3: bool
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
headers: Headers
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers

The HTTP headers.

trailers: Headers | None
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
raw_content: bytes | None
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320
321        *See also:* `Message.content`, `Message.text`
322        """
323        return self.data.content

The raw (potentially compressed) HTTP message body.

In contrast to Message.content and Message.text, accessing this property never raises.

See also: Message.content, Message.text

content: bytes | None
329    @property
330    def content(self) -> bytes | None:
331        """
332        The uncompressed HTTP message body as bytes.
333
334        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
335
336        *See also:* `Message.raw_content`, `Message.text`
337        """
338        return self.get_content()

The uncompressed HTTP message body as bytes.

Accessing this attribute may raise a ValueError when the HTTP content-encoding is invalid.

See also: Message.raw_content, Message.text

text: str | None
344    @property
345    def text(self) -> str | None:
346        """
347        The uncompressed and decoded HTTP message body as text.
348
349        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
350
351        *See also:* `Message.raw_content`, `Message.content`
352        """
353        return self.get_text()

The uncompressed and decoded HTTP message body as text.

Accessing this attribute may raise a ValueError when either content-encoding or charset is invalid.

See also: Message.raw_content, Message.content

def set_content(self, value: bytes | None) -> None:
359    def set_content(self, value: bytes | None) -> None:
360        if value is None:
361            self.raw_content = None
362            return
363        if not isinstance(value, bytes):
364            raise TypeError(
365                f"Message content must be bytes, not {type(value).__name__}. "
366                "Please use .text if you want to assign a str."
367            )
368        ce = self.headers.get("content-encoding")
369        try:
370            self.raw_content = encoding.encode(value, ce or "identity")
371        except ValueError:
372            # So we have an invalid content-encoding?
373            # Let's remove it!
374            del self.headers["content-encoding"]
375            self.raw_content = value
376
377        if "transfer-encoding" in self.headers:
378            # https://httpwg.org/specs/rfc7230.html#header.content-length
379            # don't set content-length if a transfer-encoding is provided
380            pass
381        else:
382            self.headers["content-length"] = str(len(self.raw_content))
def get_content(self, strict: bool = True) -> bytes | None:
384    def get_content(self, strict: bool = True) -> bytes | None:
385        """
386        Similar to `Message.content`, but does not raise if `strict` is `False`.
387        Instead, the compressed message body is returned as-is.
388        """
389        if self.raw_content is None:
390            return None
391        ce = self.headers.get("content-encoding")
392        if ce:
393            try:
394                content = encoding.decode(self.raw_content, ce)
395                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
396                if isinstance(content, str):
397                    raise ValueError(f"Invalid Content-Encoding: {ce}")
398                return content
399            except ValueError:
400                if strict:
401                    raise
402                return self.raw_content
403        else:
404            return self.raw_content

Similar to Message.content, but does not raise if strict is False. Instead, the compressed message body is returned as-is.

def set_text(self, text: str | None) -> None:
406    def set_text(self, text: str | None) -> None:
407        if text is None:
408            self.content = None
409            return
410        enc = infer_content_encoding(self.headers.get("content-type", ""))
411
412        try:
413            self.content = cast(bytes, encoding.encode(text, enc))
414        except ValueError:
415            # Fall back to UTF-8 and update the content-type header.
416            ct = parse_content_type(self.headers.get("content-type", "")) or (
417                "text",
418                "plain",
419                {},
420            )
421            ct[2]["charset"] = "utf-8"
422            self.headers["content-type"] = assemble_content_type(*ct)
423            enc = "utf8"
424            self.content = text.encode(enc, "surrogateescape")
def get_text(self, strict: bool = True) -> str | None:
426    def get_text(self, strict: bool = True) -> str | None:
427        """
428        Similar to `Message.text`, but does not raise if `strict` is `False`.
429        Instead, the message body is returned as surrogate-escaped UTF-8.
430        """
431        content = self.get_content(strict)
432        if content is None:
433            return None
434        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
435        try:
436            return cast(str, encoding.decode(content, enc))
437        except ValueError:
438            if strict:
439                raise
440            return content.decode("utf8", "surrogateescape")

Similar to Message.text, but does not raise if strict is False. Instead, the message body is returned as surrogate-escaped UTF-8.

timestamp_start: float
442    @property
443    def timestamp_start(self) -> float:
444        """
445        *Timestamp:* Headers received.
446        """
447        return self.data.timestamp_start

Timestamp: Headers received.

timestamp_end: float | None
453    @property
454    def timestamp_end(self) -> float | None:
455        """
456        *Timestamp:* Last byte received.
457        """
458        return self.data.timestamp_end

Timestamp: Last byte received.

def decode(self, strict: bool = True) -> None:
464    def decode(self, strict: bool = True) -> None:
465        """
466        Decodes body based on the current Content-Encoding header, then
467        removes the header. If there is no Content-Encoding header, no
468        action is taken.
469
470        *Raises:*
471         - `ValueError`, when the content-encoding is invalid and strict is True.
472        """
473        decoded = self.get_content(strict)
474        self.headers.pop("content-encoding", None)
475        self.content = decoded

Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.

Raises:

  • ValueError, when the content-encoding is invalid and strict is True.
def encode(self, encoding: str) -> None:
477    def encode(self, encoding: str) -> None:
478        """
479        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
480        Any existing content-encodings are overwritten, the content is not decoded beforehand.
481
482        *Raises:*
483         - `ValueError`, when the specified content-encoding is invalid.
484        """
485        self.headers["content-encoding"] = encoding
486        self.content = self.raw_content
487        if "content-encoding" not in self.headers:
488            raise ValueError(f"Invalid content encoding {repr(encoding)}")

Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.

Raises:

  • ValueError, when the specified content-encoding is invalid.
def json(self, **kwargs: Any) -> Any:
490    def json(self, **kwargs: Any) -> Any:
491        """
492        Returns the JSON encoded content of the response, if any.
493        `**kwargs` are optional arguments that will be
494        passed to `json.loads()`.
495
496        Will raise if the content can not be decoded and then parsed as JSON.
497
498        *Raises:*
499         - `json.decoder.JSONDecodeError` if content is not valid JSON.
500         - `TypeError` if the content is not available, for example because the response
501            has been streamed.
502        """
503        content = self.get_content(strict=False)
504        if content is None:
505            raise TypeError("Message content is not available.")
506        else:
507            return json.loads(content, **kwargs)

Returns the JSON encoded content of the response, if any. **kwargs are optional arguments that will be passed to json.loads().

Will raise if the content can not be decoded and then parsed as JSON.

Raises:

  • json.decoder.JSONDecodeError if content is not valid JSON.
  • TypeError if the content is not available, for example because the response has been streamed.
class Request(Message):
 510class Request(Message):
 511    """
 512    An HTTP request.
 513    """
 514
 515    data: RequestData
 516
 517    def __init__(
 518        self,
 519        host: str,
 520        port: int,
 521        method: bytes,
 522        scheme: bytes,
 523        authority: bytes,
 524        path: bytes,
 525        http_version: bytes,
 526        headers: Headers | tuple[tuple[bytes, bytes], ...],
 527        content: bytes | None,
 528        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 529        timestamp_start: float,
 530        timestamp_end: float | None,
 531    ):
 532        # auto-convert invalid types to retain compatibility with older code.
 533        if isinstance(host, bytes):
 534            host = host.decode("idna", "strict")
 535        if isinstance(method, str):
 536            method = method.encode("ascii", "strict")
 537        if isinstance(scheme, str):
 538            scheme = scheme.encode("ascii", "strict")
 539        if isinstance(authority, str):
 540            authority = authority.encode("ascii", "strict")
 541        if isinstance(path, str):
 542            path = path.encode("ascii", "strict")
 543        if isinstance(http_version, str):
 544            http_version = http_version.encode("ascii", "strict")
 545
 546        if isinstance(content, str):
 547            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 548        if not isinstance(headers, Headers):
 549            headers = Headers(headers)
 550        if trailers is not None and not isinstance(trailers, Headers):
 551            trailers = Headers(trailers)
 552
 553        self.data = RequestData(
 554            host=host,
 555            port=port,
 556            method=method,
 557            scheme=scheme,
 558            authority=authority,
 559            path=path,
 560            http_version=http_version,
 561            headers=headers,
 562            content=content,
 563            trailers=trailers,
 564            timestamp_start=timestamp_start,
 565            timestamp_end=timestamp_end,
 566        )
 567
 568    def __repr__(self) -> str:
 569        if self.host and self.port:
 570            hostport = f"{self.host}:{self.port}"
 571        else:
 572            hostport = ""
 573        path = self.path or ""
 574        return f"Request({self.method} {hostport}{path})"
 575
 576    @classmethod
 577    def make(
 578        cls,
 579        method: str,
 580        url: str,
 581        content: bytes | str = "",
 582        headers: (
 583            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 584        ) = (),
 585    ) -> "Request":
 586        """
 587        Simplified API for creating request objects.
 588        """
 589        # Headers can be list or dict, we differentiate here.
 590        if isinstance(headers, Headers):
 591            pass
 592        elif isinstance(headers, dict):
 593            headers = Headers(
 594                (
 595                    always_bytes(k, "utf-8", "surrogateescape"),
 596                    always_bytes(v, "utf-8", "surrogateescape"),
 597                )
 598                for k, v in headers.items()
 599            )
 600        elif isinstance(headers, Iterable):
 601            headers = Headers(headers)  # type: ignore
 602        else:
 603            raise TypeError(
 604                "Expected headers to be an iterable or dict, but is {}.".format(
 605                    type(headers).__name__
 606                )
 607            )
 608
 609        req = cls(
 610            "",
 611            0,
 612            method.encode("utf-8", "surrogateescape"),
 613            b"",
 614            b"",
 615            b"",
 616            b"HTTP/1.1",
 617            headers,
 618            b"",
 619            None,
 620            time.time(),
 621            time.time(),
 622        )
 623
 624        req.url = url
 625        # Assign this manually to update the content-length header.
 626        if isinstance(content, bytes):
 627            req.content = content
 628        elif isinstance(content, str):
 629            req.text = content
 630        else:
 631            raise TypeError(
 632                f"Expected content to be str or bytes, but is {type(content).__name__}."
 633            )
 634
 635        return req
 636
 637    @property
 638    def first_line_format(self) -> str:
 639        """
 640        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 641
 642        origin-form and asterisk-form are subsumed as "relative".
 643        """
 644        if self.method == "CONNECT":
 645            return "authority"
 646        elif self.authority:
 647            return "absolute"
 648        else:
 649            return "relative"
 650
 651    @property
 652    def method(self) -> str:
 653        """
 654        HTTP request method, e.g. "GET".
 655        """
 656        return self.data.method.decode("utf-8", "surrogateescape").upper()
 657
 658    @method.setter
 659    def method(self, val: str | bytes) -> None:
 660        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 661
 662    @property
 663    def scheme(self) -> str:
 664        """
 665        HTTP request scheme, which should be "http" or "https".
 666        """
 667        return self.data.scheme.decode("utf-8", "surrogateescape")
 668
 669    @scheme.setter
 670    def scheme(self, val: str | bytes) -> None:
 671        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 672
 673    @property
 674    def authority(self) -> str:
 675        """
 676        HTTP request authority.
 677
 678        For HTTP/1, this is the authority portion of the request target
 679        (in either absolute-form or authority-form).
 680        For origin-form and asterisk-form requests, this property is set to an empty string.
 681
 682        For HTTP/2, this is the :authority pseudo header.
 683
 684        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 685        """
 686        try:
 687            return self.data.authority.decode("idna")
 688        except UnicodeError:
 689            return self.data.authority.decode("utf8", "surrogateescape")
 690
 691    @authority.setter
 692    def authority(self, val: str | bytes) -> None:
 693        if isinstance(val, str):
 694            try:
 695                val = val.encode("idna", "strict")
 696            except UnicodeError:
 697                val = val.encode("utf8", "surrogateescape")  # type: ignore
 698        self.data.authority = val
 699
 700    @property
 701    def host(self) -> str:
 702        """
 703        Target server for this request. This may be parsed from the raw request
 704        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 705        or inferred from the proxy mode (e.g. an IP in transparent mode).
 706
 707        Setting the host attribute also updates the host header and authority information, if present.
 708
 709        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 710        """
 711        return self.data.host
 712
 713    @host.setter
 714    def host(self, val: str | bytes) -> None:
 715        self.data.host = always_str(val, "idna", "strict")
 716        self._update_host_and_authority()
 717
 718    @property
 719    def host_header(self) -> str | None:
 720        """
 721        The request's host/authority header.
 722
 723        This property maps to either ``request.headers["Host"]`` or
 724        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 725
 726        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 727        """
 728        if self.is_http2 or self.is_http3:
 729            return self.authority or self.data.headers.get("Host", None)
 730        else:
 731            return self.data.headers.get("Host", None)
 732
 733    @host_header.setter
 734    def host_header(self, val: None | str | bytes) -> None:
 735        if val is None:
 736            if self.is_http2 or self.is_http3:
 737                self.data.authority = b""
 738            self.headers.pop("Host", None)
 739        else:
 740            if self.is_http2 or self.is_http3:
 741                self.authority = val  # type: ignore
 742            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 743                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 744                self.headers["Host"] = val
 745
 746    @property
 747    def port(self) -> int:
 748        """
 749        Target port.
 750        """
 751        return self.data.port
 752
 753    @port.setter
 754    def port(self, port: int) -> None:
 755        if not isinstance(port, int):
 756            raise ValueError(f"Port must be an integer, not {port!r}.")
 757
 758        self.data.port = port
 759        self._update_host_and_authority()
 760
 761    def _update_host_and_authority(self) -> None:
 762        val = url.hostport(self.scheme, self.host, self.port)
 763
 764        # Update host header
 765        if "Host" in self.data.headers:
 766            self.data.headers["Host"] = val
 767        # Update authority
 768        if self.data.authority:
 769            self.authority = val
 770
 771    @property
 772    def path(self) -> str:
 773        """
 774        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 775        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 776
 777        This attribute includes both path and query parts of the target URI
 778        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 779        """
 780        return self.data.path.decode("utf-8", "surrogateescape")
 781
 782    @path.setter
 783    def path(self, val: str | bytes) -> None:
 784        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 785
 786    @property
 787    def url(self) -> str:
 788        """
 789        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 790
 791        Settings this property updates these attributes as well.
 792        """
 793        if self.first_line_format == "authority":
 794            return f"{self.host}:{self.port}"
 795        path = self.path if self.path != "*" else ""
 796        return url.unparse(self.scheme, self.host, self.port, path)
 797
 798    @url.setter
 799    def url(self, val: str | bytes) -> None:
 800        val = always_str(val, "utf-8", "surrogateescape")
 801        self.scheme, self.host, self.port, self.path = url.parse(val)  # type: ignore
 802
 803    @property
 804    def pretty_host(self) -> str:
 805        """
 806        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 807        This is useful in transparent mode where `Request.host` is only an IP address.
 808
 809        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 810        as the Host header could be spoofed.
 811        """
 812        authority = self.host_header
 813        if authority:
 814            return url.parse_authority(authority, check=False)[0]
 815        else:
 816            return self.host
 817
 818    @property
 819    def pretty_url(self) -> str:
 820        """
 821        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 822        """
 823        if self.first_line_format == "authority":
 824            return self.authority
 825
 826        host_header = self.host_header
 827        if not host_header:
 828            return self.url
 829
 830        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 831        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 832        path = self.path if self.path != "*" else ""
 833
 834        return url.unparse(self.scheme, pretty_host, pretty_port, path)
 835
 836    def _get_query(self):
 837        query = urllib.parse.urlparse(self.url).query
 838        return tuple(url.decode(query))
 839
 840    def _set_query(self, query_data):
 841        query = url.encode(query_data)
 842        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 843        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 844
 845    @property
 846    def query(self) -> multidict.MultiDictView[str, str]:
 847        """
 848        The request query as a mutable mapping view on the request's path.
 849        For the most part, this behaves like a dictionary.
 850        Modifications to the MultiDictView update `Request.path`, and vice versa.
 851        """
 852        return multidict.MultiDictView(self._get_query, self._set_query)
 853
 854    @query.setter
 855    def query(self, value):
 856        self._set_query(value)
 857
 858    def _get_cookies(self):
 859        h = self.headers.get_all("Cookie")
 860        return tuple(cookies.parse_cookie_headers(h))
 861
 862    def _set_cookies(self, value):
 863        self.headers["cookie"] = cookies.format_cookie_header(value)
 864
 865    @property
 866    def cookies(self) -> multidict.MultiDictView[str, str]:
 867        """
 868        The request cookies.
 869        For the most part, this behaves like a dictionary.
 870        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 871        """
 872        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 873
 874    @cookies.setter
 875    def cookies(self, value):
 876        self._set_cookies(value)
 877
 878    @property
 879    def path_components(self) -> tuple[str, ...]:
 880        """
 881        The URL's path components as a tuple of strings.
 882        Components are unquoted.
 883        """
 884        path = urllib.parse.urlparse(self.url).path
 885        # This needs to be a tuple so that it's immutable.
 886        # Otherwise, this would fail silently:
 887        #   request.path_components.append("foo")
 888        return tuple(url.unquote(i) for i in path.split("/") if i)
 889
 890    @path_components.setter
 891    def path_components(self, components: Iterable[str]):
 892        components = map(lambda x: url.quote(x, safe=""), components)
 893        path = "/" + "/".join(components)
 894        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 895        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 896
 897    def anticache(self) -> None:
 898        """
 899        Modifies this request to remove headers that might produce a cached response.
 900        """
 901        delheaders = (
 902            "if-modified-since",
 903            "if-none-match",
 904        )
 905        for i in delheaders:
 906            self.headers.pop(i, None)
 907
 908    def anticomp(self) -> None:
 909        """
 910        Modify the Accept-Encoding header to only accept uncompressed responses.
 911        """
 912        self.headers["accept-encoding"] = "identity"
 913
 914    def constrain_encoding(self) -> None:
 915        """
 916        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 917        """
 918        accept_encoding = self.headers.get("accept-encoding")
 919        if accept_encoding:
 920            self.headers["accept-encoding"] = ", ".join(
 921                e
 922                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 923                if e in accept_encoding
 924            )
 925
 926    def _get_urlencoded_form(self):
 927        is_valid_content_type = (
 928            "application/x-www-form-urlencoded"
 929            in self.headers.get("content-type", "").lower()
 930        )
 931        if is_valid_content_type:
 932            return tuple(url.decode(self.get_text(strict=False)))
 933        return ()
 934
 935    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 936        """
 937        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 938        This will overwrite the existing content if there is one.
 939        """
 940        self.headers["content-type"] = "application/x-www-form-urlencoded"
 941        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 942
 943    @property
 944    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 945        """
 946        The URL-encoded form data.
 947
 948        If the content-type indicates non-form data or the form could not be parsed, this is set to
 949        an empty `MultiDictView`.
 950
 951        Modifications to the MultiDictView update `Request.content`, and vice versa.
 952        """
 953        return multidict.MultiDictView(
 954            self._get_urlencoded_form, self._set_urlencoded_form
 955        )
 956
 957    @urlencoded_form.setter
 958    def urlencoded_form(self, value):
 959        self._set_urlencoded_form(value)
 960
 961    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 962        is_valid_content_type = (
 963            "multipart/form-data" in self.headers.get("content-type", "").lower()
 964        )
 965        if is_valid_content_type and self.content is not None:
 966            try:
 967                return multipart.decode_multipart(
 968                    self.headers.get("content-type"), self.content
 969                )
 970            except ValueError:
 971                pass
 972        return []
 973
 974    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 975        ct = self.headers.get("content-type", "")
 976        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 977        if not is_valid_content_type:
 978            """
 979            Generate a random boundary here.
 980
 981            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 982            on generating the boundary.
 983            """
 984            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 985            self.headers["content-type"] = ct = (
 986                f"multipart/form-data; boundary={boundary}"
 987            )
 988        self.content = multipart.encode_multipart(ct, value)
 989
 990    @property
 991    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 992        """
 993        The multipart form data.
 994
 995        If the content-type indicates non-form data or the form could not be parsed, this is set to
 996        an empty `MultiDictView`.
 997
 998        Modifications to the MultiDictView update `Request.content`, and vice versa.
 999        """
1000        return multidict.MultiDictView(
1001            self._get_multipart_form, self._set_multipart_form
1002        )
1003
1004    @multipart_form.setter
1005    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1006        self._set_multipart_form(value)

An HTTP request.

Request( host: str, port: int, method: bytes, scheme: bytes, authority: bytes, path: bytes, http_version: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, timestamp_start: float, timestamp_end: float | None)
517    def __init__(
518        self,
519        host: str,
520        port: int,
521        method: bytes,
522        scheme: bytes,
523        authority: bytes,
524        path: bytes,
525        http_version: bytes,
526        headers: Headers | tuple[tuple[bytes, bytes], ...],
527        content: bytes | None,
528        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
529        timestamp_start: float,
530        timestamp_end: float | None,
531    ):
532        # auto-convert invalid types to retain compatibility with older code.
533        if isinstance(host, bytes):
534            host = host.decode("idna", "strict")
535        if isinstance(method, str):
536            method = method.encode("ascii", "strict")
537        if isinstance(scheme, str):
538            scheme = scheme.encode("ascii", "strict")
539        if isinstance(authority, str):
540            authority = authority.encode("ascii", "strict")
541        if isinstance(path, str):
542            path = path.encode("ascii", "strict")
543        if isinstance(http_version, str):
544            http_version = http_version.encode("ascii", "strict")
545
546        if isinstance(content, str):
547            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
548        if not isinstance(headers, Headers):
549            headers = Headers(headers)
550        if trailers is not None and not isinstance(trailers, Headers):
551            trailers = Headers(trailers)
552
553        self.data = RequestData(
554            host=host,
555            port=port,
556            method=method,
557            scheme=scheme,
558            authority=authority,
559            path=path,
560            http_version=http_version,
561            headers=headers,
562            content=content,
563            trailers=trailers,
564            timestamp_start=timestamp_start,
565            timestamp_end=timestamp_end,
566        )
@classmethod
def make( cls, method: str, url: str, content: bytes | str = '', headers: Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Request:
576    @classmethod
577    def make(
578        cls,
579        method: str,
580        url: str,
581        content: bytes | str = "",
582        headers: (
583            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
584        ) = (),
585    ) -> "Request":
586        """
587        Simplified API for creating request objects.
588        """
589        # Headers can be list or dict, we differentiate here.
590        if isinstance(headers, Headers):
591            pass
592        elif isinstance(headers, dict):
593            headers = Headers(
594                (
595                    always_bytes(k, "utf-8", "surrogateescape"),
596                    always_bytes(v, "utf-8", "surrogateescape"),
597                )
598                for k, v in headers.items()
599            )
600        elif isinstance(headers, Iterable):
601            headers = Headers(headers)  # type: ignore
602        else:
603            raise TypeError(
604                "Expected headers to be an iterable or dict, but is {}.".format(
605                    type(headers).__name__
606                )
607            )
608
609        req = cls(
610            "",
611            0,
612            method.encode("utf-8", "surrogateescape"),
613            b"",
614            b"",
615            b"",
616            b"HTTP/1.1",
617            headers,
618            b"",
619            None,
620            time.time(),
621            time.time(),
622        )
623
624        req.url = url
625        # Assign this manually to update the content-length header.
626        if isinstance(content, bytes):
627            req.content = content
628        elif isinstance(content, str):
629            req.text = content
630        else:
631            raise TypeError(
632                f"Expected content to be str or bytes, but is {type(content).__name__}."
633            )
634
635        return req

Simplified API for creating request objects.

first_line_format: str
637    @property
638    def first_line_format(self) -> str:
639        """
640        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
641
642        origin-form and asterisk-form are subsumed as "relative".
643        """
644        if self.method == "CONNECT":
645            return "authority"
646        elif self.authority:
647            return "absolute"
648        else:
649            return "relative"

Read-only: HTTP request form as defined in RFC 7230.

origin-form and asterisk-form are subsumed as "relative".

method: str
651    @property
652    def method(self) -> str:
653        """
654        HTTP request method, e.g. "GET".
655        """
656        return self.data.method.decode("utf-8", "surrogateescape").upper()

HTTP request method, e.g. "GET".

scheme: str
662    @property
663    def scheme(self) -> str:
664        """
665        HTTP request scheme, which should be "http" or "https".
666        """
667        return self.data.scheme.decode("utf-8", "surrogateescape")

HTTP request scheme, which should be "http" or "https".

authority: str
673    @property
674    def authority(self) -> str:
675        """
676        HTTP request authority.
677
678        For HTTP/1, this is the authority portion of the request target
679        (in either absolute-form or authority-form).
680        For origin-form and asterisk-form requests, this property is set to an empty string.
681
682        For HTTP/2, this is the :authority pseudo header.
683
684        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
685        """
686        try:
687            return self.data.authority.decode("idna")
688        except UnicodeError:
689            return self.data.authority.decode("utf8", "surrogateescape")

HTTP request authority.

For HTTP/1, this is the authority portion of the request target (in either absolute-form or authority-form). For origin-form and asterisk-form requests, this property is set to an empty string.

For HTTP/2, this is the :authority pseudo header.

See also: Request.host, Request.host_header, Request.pretty_host

host: str
700    @property
701    def host(self) -> str:
702        """
703        Target server for this request. This may be parsed from the raw request
704        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
705        or inferred from the proxy mode (e.g. an IP in transparent mode).
706
707        Setting the host attribute also updates the host header and authority information, if present.
708
709        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
710        """
711        return self.data.host

Target server for this request. This may be parsed from the raw request (e.g. from a GET http://example.com/ HTTP/1.1 request line) or inferred from the proxy mode (e.g. an IP in transparent mode).

Setting the host attribute also updates the host header and authority information, if present.

See also: Request.authority, Request.host_header, Request.pretty_host

host_header: str | None
718    @property
719    def host_header(self) -> str | None:
720        """
721        The request's host/authority header.
722
723        This property maps to either ``request.headers["Host"]`` or
724        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
725
726        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
727        """
728        if self.is_http2 or self.is_http3:
729            return self.authority or self.data.headers.get("Host", None)
730        else:
731            return self.data.headers.get("Host", None)

The request's host/authority header.

This property maps to either request.headers["Host"] or request.authority, depending on whether it's HTTP/1.x or HTTP/2.0.

See also: Request.authority,Request.host, Request.pretty_host

port: int
746    @property
747    def port(self) -> int:
748        """
749        Target port.
750        """
751        return self.data.port

Target port.

path: str
771    @property
772    def path(self) -> str:
773        """
774        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
775        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
776
777        This attribute includes both path and query parts of the target URI
778        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
779        """
780        return self.data.path.decode("utf-8", "surrogateescape")

HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".

This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).

url: str
786    @property
787    def url(self) -> str:
788        """
789        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
790
791        Settings this property updates these attributes as well.
792        """
793        if self.first_line_format == "authority":
794            return f"{self.host}:{self.port}"
795        path = self.path if self.path != "*" else ""
796        return url.unparse(self.scheme, self.host, self.port, path)

The full URL string, constructed from Request.scheme, Request.host, Request.port and Request.path.

Settings this property updates these attributes as well.

pretty_host: str
803    @property
804    def pretty_host(self) -> str:
805        """
806        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
807        This is useful in transparent mode where `Request.host` is only an IP address.
808
809        *Warning:* When working in adversarial environments, this may not reflect the actual destination
810        as the Host header could be spoofed.
811        """
812        authority = self.host_header
813        if authority:
814            return url.parse_authority(authority, check=False)[0]
815        else:
816            return self.host

Read-only: Like Request.host, but using Request.host_header header as an additional (preferred) data source. This is useful in transparent mode where Request.host is only an IP address.

Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.

pretty_url: str
818    @property
819    def pretty_url(self) -> str:
820        """
821        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
822        """
823        if self.first_line_format == "authority":
824            return self.authority
825
826        host_header = self.host_header
827        if not host_header:
828            return self.url
829
830        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
831        pretty_port = pretty_port or url.default_port(self.scheme) or 443
832        path = self.path if self.path != "*" else ""
833
834        return url.unparse(self.scheme, pretty_host, pretty_port, path)

Read-only: Like Request.url, but using Request.pretty_host instead of Request.host.

query: mitmproxy.coretypes.multidict.MultiDictView[str, str]
845    @property
846    def query(self) -> multidict.MultiDictView[str, str]:
847        """
848        The request query as a mutable mapping view on the request's path.
849        For the most part, this behaves like a dictionary.
850        Modifications to the MultiDictView update `Request.path`, and vice versa.
851        """
852        return multidict.MultiDictView(self._get_query, self._set_query)

The request query as a mutable mapping view on the request's path. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.path, and vice versa.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, str]
865    @property
866    def cookies(self) -> multidict.MultiDictView[str, str]:
867        """
868        The request cookies.
869        For the most part, this behaves like a dictionary.
870        Modifications to the MultiDictView update `Request.headers`, and vice versa.
871        """
872        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The request cookies. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.headers, and vice versa.

path_components: tuple[str, ...]
878    @property
879    def path_components(self) -> tuple[str, ...]:
880        """
881        The URL's path components as a tuple of strings.
882        Components are unquoted.
883        """
884        path = urllib.parse.urlparse(self.url).path
885        # This needs to be a tuple so that it's immutable.
886        # Otherwise, this would fail silently:
887        #   request.path_components.append("foo")
888        return tuple(url.unquote(i) for i in path.split("/") if i)

The URL's path components as a tuple of strings. Components are unquoted.

def anticache(self) -> None:
897    def anticache(self) -> None:
898        """
899        Modifies this request to remove headers that might produce a cached response.
900        """
901        delheaders = (
902            "if-modified-since",
903            "if-none-match",
904        )
905        for i in delheaders:
906            self.headers.pop(i, None)

Modifies this request to remove headers that might produce a cached response.

def anticomp(self) -> None:
908    def anticomp(self) -> None:
909        """
910        Modify the Accept-Encoding header to only accept uncompressed responses.
911        """
912        self.headers["accept-encoding"] = "identity"

Modify the Accept-Encoding header to only accept uncompressed responses.

def constrain_encoding(self) -> None:
914    def constrain_encoding(self) -> None:
915        """
916        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
917        """
918        accept_encoding = self.headers.get("accept-encoding")
919        if accept_encoding:
920            self.headers["accept-encoding"] = ", ".join(
921                e
922                for e in {"gzip", "identity", "deflate", "br", "zstd"}
923                if e in accept_encoding
924            )

Limits the permissible Accept-Encoding values, based on what we can decode appropriately.

urlencoded_form: mitmproxy.coretypes.multidict.MultiDictView[str, str]
943    @property
944    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
945        """
946        The URL-encoded form data.
947
948        If the content-type indicates non-form data or the form could not be parsed, this is set to
949        an empty `MultiDictView`.
950
951        Modifications to the MultiDictView update `Request.content`, and vice versa.
952        """
953        return multidict.MultiDictView(
954            self._get_urlencoded_form, self._set_urlencoded_form
955        )

The URL-encoded form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

multipart_form: mitmproxy.coretypes.multidict.MultiDictView[bytes, bytes]
 990    @property
 991    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 992        """
 993        The multipart form data.
 994
 995        If the content-type indicates non-form data or the form could not be parsed, this is set to
 996        an empty `MultiDictView`.
 997
 998        Modifications to the MultiDictView update `Request.content`, and vice versa.
 999        """
1000        return multidict.MultiDictView(
1001            self._get_multipart_form, self._set_multipart_form
1002        )

The multipart form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

class Response(Message):
1009class Response(Message):
1010    """
1011    An HTTP response.
1012    """
1013
1014    data: ResponseData
1015
1016    def __init__(
1017        self,
1018        http_version: bytes,
1019        status_code: int,
1020        reason: bytes,
1021        headers: Headers | tuple[tuple[bytes, bytes], ...],
1022        content: bytes | None,
1023        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1024        timestamp_start: float,
1025        timestamp_end: float | None,
1026    ):
1027        # auto-convert invalid types to retain compatibility with older code.
1028        if isinstance(http_version, str):
1029            http_version = http_version.encode("ascii", "strict")
1030        if isinstance(reason, str):
1031            reason = reason.encode("ascii", "strict")
1032
1033        if isinstance(content, str):
1034            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1035        if not isinstance(headers, Headers):
1036            headers = Headers(headers)
1037        if trailers is not None and not isinstance(trailers, Headers):
1038            trailers = Headers(trailers)
1039
1040        self.data = ResponseData(
1041            http_version=http_version,
1042            status_code=status_code,
1043            reason=reason,
1044            headers=headers,
1045            content=content,
1046            trailers=trailers,
1047            timestamp_start=timestamp_start,
1048            timestamp_end=timestamp_end,
1049        )
1050
1051    def __repr__(self) -> str:
1052        if self.raw_content:
1053            ct = self.headers.get("content-type", "unknown content type")
1054            size = human.pretty_size(len(self.raw_content))
1055            details = f"{ct}, {size}"
1056        else:
1057            details = "no content"
1058        return f"Response({self.status_code}, {details})"
1059
1060    @classmethod
1061    def make(
1062        cls,
1063        status_code: int = 200,
1064        content: bytes | str = b"",
1065        headers: (
1066            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1067        ) = (),
1068    ) -> "Response":
1069        """
1070        Simplified API for creating response objects.
1071        """
1072        if isinstance(headers, Headers):
1073            headers = headers
1074        elif isinstance(headers, dict):
1075            headers = Headers(
1076                (
1077                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1078                    always_bytes(v, "utf-8", "surrogateescape"),
1079                )
1080                for k, v in headers.items()
1081            )
1082        elif isinstance(headers, Iterable):
1083            headers = Headers(headers)  # type: ignore
1084        else:
1085            raise TypeError(
1086                "Expected headers to be an iterable or dict, but is {}.".format(
1087                    type(headers).__name__
1088                )
1089            )
1090
1091        resp = cls(
1092            b"HTTP/1.1",
1093            status_code,
1094            status_codes.RESPONSES.get(status_code, "").encode(),
1095            headers,
1096            None,
1097            None,
1098            time.time(),
1099            time.time(),
1100        )
1101
1102        # Assign this manually to update the content-length header.
1103        if isinstance(content, bytes):
1104            resp.content = content
1105        elif isinstance(content, str):
1106            resp.text = content
1107        else:
1108            raise TypeError(
1109                f"Expected content to be str or bytes, but is {type(content).__name__}."
1110            )
1111
1112        return resp
1113
1114    @property
1115    def status_code(self) -> int:
1116        """
1117        HTTP Status Code, e.g. ``200``.
1118        """
1119        return self.data.status_code
1120
1121    @status_code.setter
1122    def status_code(self, status_code: int) -> None:
1123        self.data.status_code = status_code
1124
1125    @property
1126    def reason(self) -> str:
1127        """
1128        HTTP reason phrase, for example "Not Found".
1129
1130        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1131        """
1132        # Encoding: http://stackoverflow.com/a/16674906/934719
1133        return self.data.reason.decode("ISO-8859-1")
1134
1135    @reason.setter
1136    def reason(self, reason: str | bytes) -> None:
1137        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1138
1139    def _get_cookies(self):
1140        h = self.headers.get_all("set-cookie")
1141        all_cookies = cookies.parse_set_cookie_headers(h)
1142        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1143
1144    def _set_cookies(self, value):
1145        cookie_headers = []
1146        for k, v in value:
1147            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1148            cookie_headers.append(header)
1149        self.headers.set_all("set-cookie", cookie_headers)
1150
1151    @property
1152    def cookies(
1153        self,
1154    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1155        """
1156        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1157        name strings, and values are `(cookie value, attributes)` tuples. Within
1158        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1159        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1160
1161        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1162        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1163        """
1164        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1165
1166    @cookies.setter
1167    def cookies(self, value):
1168        self._set_cookies(value)
1169
1170    def refresh(self, now=None):
1171        """
1172        This fairly complex and heuristic function refreshes a server
1173        response for replay.
1174
1175         - It adjusts date, expires, and last-modified headers.
1176         - It adjusts cookie expiration.
1177        """
1178        if not now:
1179            now = time.time()
1180        delta = now - self.timestamp_start
1181        refresh_headers = [
1182            "date",
1183            "expires",
1184            "last-modified",
1185        ]
1186        for i in refresh_headers:
1187            if i in self.headers:
1188                d = parsedate_tz(self.headers[i])
1189                if d:
1190                    new = mktime_tz(d) + delta
1191                    try:
1192                        self.headers[i] = formatdate(new, usegmt=True)
1193                    except OSError:  # pragma: no cover
1194                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1195        c = []
1196        for set_cookie_header in self.headers.get_all("set-cookie"):
1197            try:
1198                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1199            except ValueError:
1200                refreshed = set_cookie_header
1201            c.append(refreshed)
1202        if c:
1203            self.headers.set_all("set-cookie", c)

An HTTP response.

Response( http_version: bytes, status_code: int, reason: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: None | Headers | tuple[tuple[bytes, bytes], ...], timestamp_start: float, timestamp_end: float | None)
1016    def __init__(
1017        self,
1018        http_version: bytes,
1019        status_code: int,
1020        reason: bytes,
1021        headers: Headers | tuple[tuple[bytes, bytes], ...],
1022        content: bytes | None,
1023        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1024        timestamp_start: float,
1025        timestamp_end: float | None,
1026    ):
1027        # auto-convert invalid types to retain compatibility with older code.
1028        if isinstance(http_version, str):
1029            http_version = http_version.encode("ascii", "strict")
1030        if isinstance(reason, str):
1031            reason = reason.encode("ascii", "strict")
1032
1033        if isinstance(content, str):
1034            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1035        if not isinstance(headers, Headers):
1036            headers = Headers(headers)
1037        if trailers is not None and not isinstance(trailers, Headers):
1038            trailers = Headers(trailers)
1039
1040        self.data = ResponseData(
1041            http_version=http_version,
1042            status_code=status_code,
1043            reason=reason,
1044            headers=headers,
1045            content=content,
1046            trailers=trailers,
1047            timestamp_start=timestamp_start,
1048            timestamp_end=timestamp_end,
1049        )
@classmethod
def make( cls, status_code: int = 200, content: bytes | str = b'', headers: Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Response:
1060    @classmethod
1061    def make(
1062        cls,
1063        status_code: int = 200,
1064        content: bytes | str = b"",
1065        headers: (
1066            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1067        ) = (),
1068    ) -> "Response":
1069        """
1070        Simplified API for creating response objects.
1071        """
1072        if isinstance(headers, Headers):
1073            headers = headers
1074        elif isinstance(headers, dict):
1075            headers = Headers(
1076                (
1077                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1078                    always_bytes(v, "utf-8", "surrogateescape"),
1079                )
1080                for k, v in headers.items()
1081            )
1082        elif isinstance(headers, Iterable):
1083            headers = Headers(headers)  # type: ignore
1084        else:
1085            raise TypeError(
1086                "Expected headers to be an iterable or dict, but is {}.".format(
1087                    type(headers).__name__
1088                )
1089            )
1090
1091        resp = cls(
1092            b"HTTP/1.1",
1093            status_code,
1094            status_codes.RESPONSES.get(status_code, "").encode(),
1095            headers,
1096            None,
1097            None,
1098            time.time(),
1099            time.time(),
1100        )
1101
1102        # Assign this manually to update the content-length header.
1103        if isinstance(content, bytes):
1104            resp.content = content
1105        elif isinstance(content, str):
1106            resp.text = content
1107        else:
1108            raise TypeError(
1109                f"Expected content to be str or bytes, but is {type(content).__name__}."
1110            )
1111
1112        return resp

Simplified API for creating response objects.

status_code: int
1114    @property
1115    def status_code(self) -> int:
1116        """
1117        HTTP Status Code, e.g. ``200``.
1118        """
1119        return self.data.status_code

HTTP Status Code, e.g. 200.

reason: str
1125    @property
1126    def reason(self) -> str:
1127        """
1128        HTTP reason phrase, for example "Not Found".
1129
1130        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1131        """
1132        # Encoding: http://stackoverflow.com/a/16674906/934719
1133        return self.data.reason.decode("ISO-8859-1")

HTTP reason phrase, for example "Not Found".

HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, tuple[str, mitmproxy.coretypes.multidict.MultiDict[str, str | None]]]
1151    @property
1152    def cookies(
1153        self,
1154    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1155        """
1156        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1157        name strings, and values are `(cookie value, attributes)` tuples. Within
1158        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1159        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1160
1161        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1162        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1163        """
1164        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The response cookies. A possibly empty MultiDictView, where the keys are cookie name strings, and values are (cookie value, attributes) tuples. Within attributes, unary attributes (e.g. HTTPOnly) are indicated by a None value. Modifications to the MultiDictView update Response.headers, and vice versa.

Warning: Changes to attributes will not be picked up unless you also reassign the (cookie value, attributes) tuple directly in the MultiDictView.

def refresh(self, now=None):
1170    def refresh(self, now=None):
1171        """
1172        This fairly complex and heuristic function refreshes a server
1173        response for replay.
1174
1175         - It adjusts date, expires, and last-modified headers.
1176         - It adjusts cookie expiration.
1177        """
1178        if not now:
1179            now = time.time()
1180        delta = now - self.timestamp_start
1181        refresh_headers = [
1182            "date",
1183            "expires",
1184            "last-modified",
1185        ]
1186        for i in refresh_headers:
1187            if i in self.headers:
1188                d = parsedate_tz(self.headers[i])
1189                if d:
1190                    new = mktime_tz(d) + delta
1191                    try:
1192                        self.headers[i] = formatdate(new, usegmt=True)
1193                    except OSError:  # pragma: no cover
1194                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1195        c = []
1196        for set_cookie_header in self.headers.get_all("set-cookie"):
1197            try:
1198                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1199            except ValueError:
1200                refreshed = set_cookie_header
1201            c.append(refreshed)
1202        if c:
1203            self.headers.set_all("set-cookie", c)

This fairly complex and heuristic function refreshes a server response for replay.

  • It adjusts date, expires, and last-modified headers.
  • It adjusts cookie expiration.
class Headers(mitmproxy.coretypes.multidict._MultiDict[~KT, ~VT], mitmproxy.coretypes.serializable.Serializable):
 50class Headers(multidict.MultiDict):  # type: ignore
 51    """
 52    Header class which allows both convenient access to individual headers as well as
 53    direct access to the underlying raw data. Provides a full dictionary interface.
 54
 55    Create headers with keyword arguments:
 56    >>> h = Headers(host="example.com", content_type="application/xml")
 57
 58    Headers mostly behave like a normal dict:
 59    >>> h["Host"]
 60    "example.com"
 61
 62    Headers are case insensitive:
 63    >>> h["host"]
 64    "example.com"
 65
 66    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
 67    >>> h = Headers([
 68        (b"Host",b"example.com"),
 69        (b"Accept",b"text/html"),
 70        (b"accept",b"application/xml")
 71    ])
 72
 73    Multiple headers are folded into a single header as per RFC 7230:
 74    >>> h["Accept"]
 75    "text/html, application/xml"
 76
 77    Setting a header removes all existing headers with the same name:
 78    >>> h["Accept"] = "application/text"
 79    >>> h["Accept"]
 80    "application/text"
 81
 82    `bytes(h)` returns an HTTP/1 header block:
 83    >>> print(bytes(h))
 84    Host: example.com
 85    Accept: application/text
 86
 87    For full control, the raw header fields can be accessed:
 88    >>> h.fields
 89
 90    Caveats:
 91     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
 92    """
 93
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )
119
120    fields: tuple[tuple[bytes, bytes], ...]
121
122    @staticmethod
123    def _reduce_values(values) -> str:
124        # Headers can be folded
125        return ", ".join(values)
126
127    @staticmethod
128    def _kconv(key) -> str:
129        # Headers are case-insensitive
130        return key.lower()
131
132    def __bytes__(self) -> bytes:
133        if self.fields:
134            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
135        else:
136            return b""
137
138    def __delitem__(self, key: str | bytes) -> None:
139        key = _always_bytes(key)
140        super().__delitem__(key)
141
142    def __iter__(self) -> Iterator[str]:
143        for x in super().__iter__():
144            yield _native(x)
145
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]
158
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)
167
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)
172
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.

Create headers with keyword arguments:

>>> h = Headers(host="example.com", content_type="application/xml")

Headers mostly behave like a normal dict:

>>> h["Host"]
"example.com"

Headers are case insensitive:

>>> h["host"]
"example.com"

Headers can also be created from a list of raw (header_name, header_value) byte tuples:

>>> h = Headers([
    (b"Host",b"example.com"),
    (b"Accept",b"text/html"),
    (b"accept",b"application/xml")
])

Multiple headers are folded into a single header as per RFC 7230:

>>> h["Accept"]
"text/html, application/xml"

Setting a header removes all existing headers with the same name:

>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"

bytes(h) returns an HTTP/1 header block:

>>> print(bytes(h))
Host: example.com
Accept: application/text

For full control, the raw header fields can be accessed:

>>> h.fields

Caveats:

Headers(fields: Iterable[tuple[bytes, bytes]] = (), **headers)
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )

Args:

  • fields: (optional) list of (name, value) header byte tuples, e.g. [(b"Host", b"example.com")]. All names and values must be bytes.
  • **headers: Additional headers to set. Will overwrite existing values from fields. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.

If **headers contains multiple keys that have equal .lower() representations, the behavior is undefined.

fields: tuple[tuple[bytes, bytes], ...]

The underlying raw datastructure.

def get_all(self, name: str | bytes) -> list[str]:
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]

Like Headers.get, but does not fold multiple headers into a single one. This is useful for Set-Cookie and Cookie headers, which do not support folding.

See also:

def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)

Explicitly set multiple headers for the given key. See Headers.get_all.

def insert(self, index: int, key: str | bytes, value: str | bytes):
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)

Insert an additional value for the given key at the specified position.

def items(self, multi=False):
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Get all (key, value) tuples.

If multi is True, all (key, value) pairs will be returned. If False, only one tuple per key is returned.

built with pdocpdoc logo