Edit on GitHub

mitmproxy.http

   1import binascii
   2import json
   3import os
   4import time
   5import urllib.parse
   6import warnings
   7from collections.abc import Callable
   8from collections.abc import Iterable
   9from collections.abc import Iterator
  10from collections.abc import Mapping
  11from collections.abc import Sequence
  12from dataclasses import dataclass
  13from dataclasses import fields
  14from email.utils import formatdate
  15from email.utils import mktime_tz
  16from email.utils import parsedate_tz
  17from typing import Any
  18from typing import cast
  19
  20from mitmproxy import flow
  21from mitmproxy.coretypes import multidict
  22from mitmproxy.coretypes import serializable
  23from mitmproxy.net import encoding
  24from mitmproxy.net.http import cookies
  25from mitmproxy.net.http import multipart
  26from mitmproxy.net.http import status_codes
  27from mitmproxy.net.http import url
  28from mitmproxy.net.http.headers import assemble_content_type
  29from mitmproxy.net.http.headers import infer_content_encoding
  30from mitmproxy.net.http.headers import parse_content_type
  31from mitmproxy.utils import human
  32from mitmproxy.utils import strutils
  33from mitmproxy.utils import typecheck
  34from mitmproxy.utils.strutils import always_bytes
  35from mitmproxy.utils.strutils import always_str
  36from mitmproxy.websocket import WebSocketData
  37
  38
  39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
  40def _native(x: bytes) -> str:
  41    return x.decode("utf-8", "surrogateescape")
  42
  43
  44def _always_bytes(x: str | bytes) -> bytes:
  45    return strutils.always_bytes(x, "utf-8", "surrogateescape")
  46
  47
  48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types.
  49class Headers(multidict.MultiDict):  # type: ignore
  50    """
  51    Header class which allows both convenient access to individual headers as well as
  52    direct access to the underlying raw data. Provides a full dictionary interface.
  53
  54    Create headers with keyword arguments:
  55    >>> h = Headers(host="example.com", content_type="application/xml")
  56
  57    Headers mostly behave like a normal dict:
  58    >>> h["Host"]
  59    "example.com"
  60
  61    Headers are case insensitive:
  62    >>> h["host"]
  63    "example.com"
  64
  65    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
  66    >>> h = Headers([
  67        (b"Host",b"example.com"),
  68        (b"Accept",b"text/html"),
  69        (b"accept",b"application/xml")
  70    ])
  71
  72    Multiple headers are folded into a single header as per RFC 7230:
  73    >>> h["Accept"]
  74    "text/html, application/xml"
  75
  76    Setting a header removes all existing headers with the same name:
  77    >>> h["Accept"] = "application/text"
  78    >>> h["Accept"]
  79    "application/text"
  80
  81    `bytes(h)` returns an HTTP/1 header block:
  82    >>> print(bytes(h))
  83    Host: example.com
  84    Accept: application/text
  85
  86    For full control, the raw header fields can be accessed:
  87    >>> h.fields
  88
  89    Caveats:
  90     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
  91    """
  92
  93    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
  94        """
  95        *Args:*
  96         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
  97           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
  98         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
  99           For convenience, underscores in header names will be transformed to dashes -
 100           this behaviour does not extend to other methods.
 101
 102        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
 103        the behavior is undefined.
 104        """
 105        super().__init__(fields)
 106
 107        for key, value in self.fields:
 108            if not isinstance(key, bytes) or not isinstance(value, bytes):
 109                raise TypeError("Header fields must be bytes.")
 110
 111        # content_type -> content-type
 112        self.update(
 113            {
 114                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
 115                for name, value in headers.items()
 116            }
 117        )
 118
 119    fields: tuple[tuple[bytes, bytes], ...]
 120
 121    @staticmethod
 122    def _reduce_values(values) -> str:
 123        # Headers can be folded
 124        return ", ".join(values)
 125
 126    @staticmethod
 127    def _kconv(key) -> str:
 128        # Headers are case-insensitive
 129        return key.lower()
 130
 131    def __bytes__(self) -> bytes:
 132        if self.fields:
 133            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
 134        else:
 135            return b""
 136
 137    def __delitem__(self, key: str | bytes) -> None:
 138        key = _always_bytes(key)
 139        super().__delitem__(key)
 140
 141    def __iter__(self) -> Iterator[str]:
 142        for x in super().__iter__():
 143            yield _native(x)
 144
 145    def get_all(self, name: str | bytes) -> list[str]:
 146        """
 147        Like `Headers.get`, but does not fold multiple headers into a single one.
 148        This is useful for Set-Cookie and Cookie headers, which do not support folding.
 149
 150        *See also:*
 151         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
 152         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
 153         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
 154        """
 155        name = _always_bytes(name)
 156        return [_native(x) for x in super().get_all(name)]
 157
 158    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
 159        """
 160        Explicitly set multiple headers for the given key.
 161        See `Headers.get_all`.
 162        """
 163        name = _always_bytes(name)
 164        values = [_always_bytes(x) for x in values]
 165        return super().set_all(name, values)
 166
 167    def insert(self, index: int, key: str | bytes, value: str | bytes):
 168        key = _always_bytes(key)
 169        value = _always_bytes(value)
 170        super().insert(index, key, value)
 171
 172    def items(self, multi=False):
 173        if multi:
 174            return ((_native(k), _native(v)) for k, v in self.fields)
 175        else:
 176            return super().items()
 177
 178
 179@dataclass
 180class MessageData(serializable.Serializable):
 181    http_version: bytes
 182    headers: Headers
 183    content: bytes | None
 184    trailers: Headers | None
 185    timestamp_start: float
 186    timestamp_end: float | None
 187
 188    # noinspection PyUnreachableCode
 189    if __debug__:
 190
 191        def __post_init__(self):
 192            for field in fields(self):
 193                val = getattr(self, field.name)
 194                typecheck.check_option_type(field.name, val, field.type)
 195
 196    def set_state(self, state):
 197        for k, v in state.items():
 198            if k in ("headers", "trailers") and v is not None:
 199                v = Headers.from_state(v)
 200            setattr(self, k, v)
 201
 202    def get_state(self):
 203        state = vars(self).copy()
 204        state["headers"] = state["headers"].get_state()
 205        if state["trailers"] is not None:
 206            state["trailers"] = state["trailers"].get_state()
 207        return state
 208
 209    @classmethod
 210    def from_state(cls, state):
 211        state["headers"] = Headers.from_state(state["headers"])
 212        if state["trailers"] is not None:
 213            state["trailers"] = Headers.from_state(state["trailers"])
 214        return cls(**state)
 215
 216
 217@dataclass
 218class RequestData(MessageData):
 219    host: str
 220    port: int
 221    method: bytes
 222    scheme: bytes
 223    authority: bytes
 224    path: bytes
 225
 226
 227@dataclass
 228class ResponseData(MessageData):
 229    status_code: int
 230    reason: bytes
 231
 232
 233class Message(serializable.Serializable):
 234    """Base class for `Request` and `Response`."""
 235
 236    @classmethod
 237    def from_state(cls, state):
 238        return cls(**state)
 239
 240    def get_state(self):
 241        return self.data.get_state()
 242
 243    def set_state(self, state):
 244        self.data.set_state(state)
 245
 246    data: MessageData
 247    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
 248    """
 249    This attribute controls if the message body should be streamed.
 250
 251    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
 252    This makes it possible to perform string replacements on the entire body.
 253    If `True`, the message body will not be buffered on the proxy
 254    but immediately forwarded instead.
 255    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
 256    Please note that packet boundaries generally should not be relied upon.
 257
 258    This attribute must be set in the `requestheaders` or `responseheaders` hook.
 259    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
 260    """
 261
 262    @property
 263    def http_version(self) -> str:
 264        """
 265        HTTP version string, for example `HTTP/1.1`.
 266        """
 267        return self.data.http_version.decode("utf-8", "surrogateescape")
 268
 269    @http_version.setter
 270    def http_version(self, http_version: str | bytes) -> None:
 271        self.data.http_version = strutils.always_bytes(
 272            http_version, "utf-8", "surrogateescape"
 273        )
 274
 275    @property
 276    def is_http10(self) -> bool:
 277        return self.data.http_version == b"HTTP/1.0"
 278
 279    @property
 280    def is_http11(self) -> bool:
 281        return self.data.http_version == b"HTTP/1.1"
 282
 283    @property
 284    def is_http2(self) -> bool:
 285        return self.data.http_version == b"HTTP/2.0"
 286
 287    @property
 288    def is_http3(self) -> bool:
 289        return self.data.http_version == b"HTTP/3"
 290
 291    @property
 292    def headers(self) -> Headers:
 293        """
 294        The HTTP headers.
 295        """
 296        return self.data.headers
 297
 298    @headers.setter
 299    def headers(self, h: Headers) -> None:
 300        self.data.headers = h
 301
 302    @property
 303    def trailers(self) -> Headers | None:
 304        """
 305        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
 306        """
 307        return self.data.trailers
 308
 309    @trailers.setter
 310    def trailers(self, h: Headers | None) -> None:
 311        self.data.trailers = h
 312
 313    @property
 314    def raw_content(self) -> bytes | None:
 315        """
 316        The raw (potentially compressed) HTTP message body.
 317
 318        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
 319
 320        *See also:* `Message.content`, `Message.text`
 321        """
 322        return self.data.content
 323
 324    @raw_content.setter
 325    def raw_content(self, content: bytes | None) -> None:
 326        self.data.content = content
 327
 328    @property
 329    def content(self) -> bytes | None:
 330        """
 331        The uncompressed HTTP message body as bytes.
 332
 333        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
 334
 335        *See also:* `Message.raw_content`, `Message.text`
 336        """
 337        return self.get_content()
 338
 339    @content.setter
 340    def content(self, value: bytes | None) -> None:
 341        self.set_content(value)
 342
 343    @property
 344    def text(self) -> str | None:
 345        """
 346        The uncompressed and decoded HTTP message body as text.
 347
 348        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
 349
 350        *See also:* `Message.raw_content`, `Message.content`
 351        """
 352        return self.get_text()
 353
 354    @text.setter
 355    def text(self, value: str | None) -> None:
 356        self.set_text(value)
 357
 358    def set_content(self, value: bytes | None) -> None:
 359        if value is None:
 360            self.raw_content = None
 361            return
 362        if not isinstance(value, bytes):
 363            raise TypeError(
 364                f"Message content must be bytes, not {type(value).__name__}. "
 365                "Please use .text if you want to assign a str."
 366            )
 367        ce = self.headers.get("content-encoding")
 368        try:
 369            self.raw_content = encoding.encode(value, ce or "identity")
 370        except ValueError:
 371            # So we have an invalid content-encoding?
 372            # Let's remove it!
 373            del self.headers["content-encoding"]
 374            self.raw_content = value
 375
 376        if "transfer-encoding" in self.headers:
 377            # https://httpwg.org/specs/rfc7230.html#header.content-length
 378            # don't set content-length if a transfer-encoding is provided
 379            pass
 380        else:
 381            self.headers["content-length"] = str(len(self.raw_content))
 382
 383    def get_content(self, strict: bool = True) -> bytes | None:
 384        """
 385        Similar to `Message.content`, but does not raise if `strict` is `False`.
 386        Instead, the compressed message body is returned as-is.
 387        """
 388        if self.raw_content is None:
 389            return None
 390        ce = self.headers.get("content-encoding")
 391        if ce:
 392            try:
 393                content = encoding.decode(self.raw_content, ce)
 394                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
 395                if isinstance(content, str):
 396                    raise ValueError(f"Invalid Content-Encoding: {ce}")
 397                return content
 398            except ValueError:
 399                if strict:
 400                    raise
 401                return self.raw_content
 402        else:
 403            return self.raw_content
 404
 405    def set_text(self, text: str | None) -> None:
 406        if text is None:
 407            self.content = None
 408            return
 409        enc = infer_content_encoding(self.headers.get("content-type", ""))
 410
 411        try:
 412            self.content = cast(bytes, encoding.encode(text, enc))
 413        except ValueError:
 414            # Fall back to UTF-8 and update the content-type header.
 415            ct = parse_content_type(self.headers.get("content-type", "")) or (
 416                "text",
 417                "plain",
 418                {},
 419            )
 420            ct[2]["charset"] = "utf-8"
 421            self.headers["content-type"] = assemble_content_type(*ct)
 422            enc = "utf8"
 423            self.content = text.encode(enc, "surrogateescape")
 424
 425    def get_text(self, strict: bool = True) -> str | None:
 426        """
 427        Similar to `Message.text`, but does not raise if `strict` is `False`.
 428        Instead, the message body is returned as surrogate-escaped UTF-8.
 429        """
 430        content = self.get_content(strict)
 431        if content is None:
 432            return None
 433        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
 434        try:
 435            return cast(str, encoding.decode(content, enc))
 436        except ValueError:
 437            if strict:
 438                raise
 439            return content.decode("utf8", "surrogateescape")
 440
 441    @property
 442    def timestamp_start(self) -> float:
 443        """
 444        *Timestamp:* Headers received.
 445        """
 446        return self.data.timestamp_start
 447
 448    @timestamp_start.setter
 449    def timestamp_start(self, timestamp_start: float) -> None:
 450        self.data.timestamp_start = timestamp_start
 451
 452    @property
 453    def timestamp_end(self) -> float | None:
 454        """
 455        *Timestamp:* Last byte received.
 456        """
 457        return self.data.timestamp_end
 458
 459    @timestamp_end.setter
 460    def timestamp_end(self, timestamp_end: float | None):
 461        self.data.timestamp_end = timestamp_end
 462
 463    def decode(self, strict: bool = True) -> None:
 464        """
 465        Decodes body based on the current Content-Encoding header, then
 466        removes the header. If there is no Content-Encoding header, no
 467        action is taken.
 468
 469        *Raises:*
 470         - `ValueError`, when the content-encoding is invalid and strict is True.
 471        """
 472        decoded = self.get_content(strict)
 473        self.headers.pop("content-encoding", None)
 474        self.content = decoded
 475
 476    def encode(self, encoding: str) -> None:
 477        """
 478        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
 479        Any existing content-encodings are overwritten, the content is not decoded beforehand.
 480
 481        *Raises:*
 482         - `ValueError`, when the specified content-encoding is invalid.
 483        """
 484        self.headers["content-encoding"] = encoding
 485        self.content = self.raw_content
 486        if "content-encoding" not in self.headers:
 487            raise ValueError(f"Invalid content encoding {repr(encoding)}")
 488
 489    def json(self, **kwargs: Any) -> Any:
 490        """
 491        Returns the JSON encoded content of the response, if any.
 492        `**kwargs` are optional arguments that will be
 493        passed to `json.loads()`.
 494
 495        Will raise if the content can not be decoded and then parsed as JSON.
 496
 497        *Raises:*
 498         - `json.decoder.JSONDecodeError` if content is not valid JSON.
 499         - `TypeError` if the content is not available, for example because the response
 500            has been streamed.
 501        """
 502        content = self.get_content(strict=False)
 503        if content is None:
 504            raise TypeError("Message content is not available.")
 505        else:
 506            return json.loads(content, **kwargs)
 507
 508
 509class Request(Message):
 510    """
 511    An HTTP request.
 512    """
 513
 514    data: RequestData
 515
 516    def __init__(
 517        self,
 518        host: str,
 519        port: int,
 520        method: bytes,
 521        scheme: bytes,
 522        authority: bytes,
 523        path: bytes,
 524        http_version: bytes,
 525        headers: Headers | tuple[tuple[bytes, bytes], ...],
 526        content: bytes | None,
 527        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 528        timestamp_start: float,
 529        timestamp_end: float | None,
 530    ):
 531        # auto-convert invalid types to retain compatibility with older code.
 532        if isinstance(host, bytes):
 533            host = host.decode("idna", "strict")
 534        if isinstance(method, str):
 535            method = method.encode("ascii", "strict")
 536        if isinstance(scheme, str):
 537            scheme = scheme.encode("ascii", "strict")
 538        if isinstance(authority, str):
 539            authority = authority.encode("ascii", "strict")
 540        if isinstance(path, str):
 541            path = path.encode("ascii", "strict")
 542        if isinstance(http_version, str):
 543            http_version = http_version.encode("ascii", "strict")
 544
 545        if isinstance(content, str):
 546            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 547        if not isinstance(headers, Headers):
 548            headers = Headers(headers)
 549        if trailers is not None and not isinstance(trailers, Headers):
 550            trailers = Headers(trailers)
 551
 552        self.data = RequestData(
 553            host=host,
 554            port=port,
 555            method=method,
 556            scheme=scheme,
 557            authority=authority,
 558            path=path,
 559            http_version=http_version,
 560            headers=headers,
 561            content=content,
 562            trailers=trailers,
 563            timestamp_start=timestamp_start,
 564            timestamp_end=timestamp_end,
 565        )
 566
 567    def __repr__(self) -> str:
 568        if self.host and self.port:
 569            hostport = f"{self.host}:{self.port}"
 570        else:
 571            hostport = ""
 572        path = self.path or ""
 573        return f"Request({self.method} {hostport}{path})"
 574
 575    @classmethod
 576    def make(
 577        cls,
 578        method: str,
 579        url: str,
 580        content: bytes | str = "",
 581        headers: (
 582            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 583        ) = (),
 584    ) -> "Request":
 585        """
 586        Simplified API for creating request objects.
 587        """
 588        # Headers can be list or dict, we differentiate here.
 589        if isinstance(headers, Headers):
 590            pass
 591        elif isinstance(headers, dict):
 592            headers = Headers(
 593                (
 594                    always_bytes(k, "utf-8", "surrogateescape"),
 595                    always_bytes(v, "utf-8", "surrogateescape"),
 596                )
 597                for k, v in headers.items()
 598            )
 599        elif isinstance(headers, Iterable):
 600            headers = Headers(headers)  # type: ignore
 601        else:
 602            raise TypeError(
 603                "Expected headers to be an iterable or dict, but is {}.".format(
 604                    type(headers).__name__
 605                )
 606            )
 607
 608        req = cls(
 609            "",
 610            0,
 611            method.encode("utf-8", "surrogateescape"),
 612            b"",
 613            b"",
 614            b"",
 615            b"HTTP/1.1",
 616            headers,
 617            b"",
 618            None,
 619            time.time(),
 620            time.time(),
 621        )
 622
 623        req.url = url
 624        # Assign this manually to update the content-length header.
 625        if isinstance(content, bytes):
 626            req.content = content
 627        elif isinstance(content, str):
 628            req.text = content
 629        else:
 630            raise TypeError(
 631                f"Expected content to be str or bytes, but is {type(content).__name__}."
 632            )
 633
 634        return req
 635
 636    @property
 637    def first_line_format(self) -> str:
 638        """
 639        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 640
 641        origin-form and asterisk-form are subsumed as "relative".
 642        """
 643        if self.method == "CONNECT":
 644            return "authority"
 645        elif self.authority:
 646            return "absolute"
 647        else:
 648            return "relative"
 649
 650    @property
 651    def method(self) -> str:
 652        """
 653        HTTP request method, e.g. "GET".
 654        """
 655        return self.data.method.decode("utf-8", "surrogateescape").upper()
 656
 657    @method.setter
 658    def method(self, val: str | bytes) -> None:
 659        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 660
 661    @property
 662    def scheme(self) -> str:
 663        """
 664        HTTP request scheme, which should be "http" or "https".
 665        """
 666        return self.data.scheme.decode("utf-8", "surrogateescape")
 667
 668    @scheme.setter
 669    def scheme(self, val: str | bytes) -> None:
 670        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 671
 672    @property
 673    def authority(self) -> str:
 674        """
 675        HTTP request authority.
 676
 677        For HTTP/1, this is the authority portion of the request target
 678        (in either absolute-form or authority-form).
 679        For origin-form and asterisk-form requests, this property is set to an empty string.
 680
 681        For HTTP/2, this is the :authority pseudo header.
 682
 683        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 684        """
 685        try:
 686            return self.data.authority.decode("idna")
 687        except UnicodeError:
 688            return self.data.authority.decode("utf8", "surrogateescape")
 689
 690    @authority.setter
 691    def authority(self, val: str | bytes) -> None:
 692        if isinstance(val, str):
 693            try:
 694                val = val.encode("idna", "strict")
 695            except UnicodeError:
 696                val = val.encode("utf8", "surrogateescape")  # type: ignore
 697        self.data.authority = val
 698
 699    @property
 700    def host(self) -> str:
 701        """
 702        Target server for this request. This may be parsed from the raw request
 703        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 704        or inferred from the proxy mode (e.g. an IP in transparent mode).
 705
 706        Setting the host attribute also updates the host header and authority information, if present.
 707
 708        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 709        """
 710        return self.data.host
 711
 712    @host.setter
 713    def host(self, val: str | bytes) -> None:
 714        self.data.host = always_str(val, "idna", "strict")
 715        self._update_host_and_authority()
 716
 717    @property
 718    def host_header(self) -> str | None:
 719        """
 720        The request's host/authority header.
 721
 722        This property maps to either ``request.headers["Host"]`` or
 723        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 724
 725        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 726        """
 727        if self.is_http2 or self.is_http3:
 728            return self.authority or self.data.headers.get("Host", None)
 729        else:
 730            return self.data.headers.get("Host", None)
 731
 732    @host_header.setter
 733    def host_header(self, val: None | str | bytes) -> None:
 734        if val is None:
 735            if self.is_http2 or self.is_http3:
 736                self.data.authority = b""
 737            self.headers.pop("Host", None)
 738        else:
 739            if self.is_http2 or self.is_http3:
 740                self.authority = val  # type: ignore
 741            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 742                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 743                self.headers["Host"] = val
 744
 745    @property
 746    def port(self) -> int:
 747        """
 748        Target port.
 749        """
 750        return self.data.port
 751
 752    @port.setter
 753    def port(self, port: int) -> None:
 754        if not isinstance(port, int):
 755            raise ValueError(f"Port must be an integer, not {port!r}.")
 756
 757        self.data.port = port
 758        self._update_host_and_authority()
 759
 760    def _update_host_and_authority(self) -> None:
 761        val = url.hostport(self.scheme, self.host, self.port)
 762
 763        # Update host header
 764        if "Host" in self.data.headers:
 765            self.data.headers["Host"] = val
 766        # Update authority
 767        if self.data.authority:
 768            self.authority = val
 769
 770    @property
 771    def path(self) -> str:
 772        """
 773        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 774        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 775
 776        This attribute includes both path and query parts of the target URI
 777        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 778        """
 779        return self.data.path.decode("utf-8", "surrogateescape")
 780
 781    @path.setter
 782    def path(self, val: str | bytes) -> None:
 783        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 784
 785    @property
 786    def url(self) -> str:
 787        """
 788        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 789
 790        Settings this property updates these attributes as well.
 791        """
 792        if self.first_line_format == "authority":
 793            return f"{self.host}:{self.port}"
 794        return url.unparse(self.scheme, self.host, self.port, self.path)
 795
 796    @url.setter
 797    def url(self, val: str | bytes) -> None:
 798        val = always_str(val, "utf-8", "surrogateescape")
 799        self.scheme, self.host, self.port, self.path = url.parse(val)
 800
 801    @property
 802    def pretty_host(self) -> str:
 803        """
 804        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 805        This is useful in transparent mode where `Request.host` is only an IP address.
 806
 807        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 808        as the Host header could be spoofed.
 809        """
 810        authority = self.host_header
 811        if authority:
 812            return url.parse_authority(authority, check=False)[0]
 813        else:
 814            return self.host
 815
 816    @property
 817    def pretty_url(self) -> str:
 818        """
 819        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 820        """
 821        if self.first_line_format == "authority":
 822            return self.authority
 823
 824        host_header = self.host_header
 825        if not host_header:
 826            return self.url
 827
 828        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 829        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 830
 831        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
 832
 833    def _get_query(self):
 834        query = urllib.parse.urlparse(self.url).query
 835        return tuple(url.decode(query))
 836
 837    def _set_query(self, query_data):
 838        query = url.encode(query_data)
 839        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 840        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 841
 842    @property
 843    def query(self) -> multidict.MultiDictView[str, str]:
 844        """
 845        The request query as a mutable mapping view on the request's path.
 846        For the most part, this behaves like a dictionary.
 847        Modifications to the MultiDictView update `Request.path`, and vice versa.
 848        """
 849        return multidict.MultiDictView(self._get_query, self._set_query)
 850
 851    @query.setter
 852    def query(self, value):
 853        self._set_query(value)
 854
 855    def _get_cookies(self):
 856        h = self.headers.get_all("Cookie")
 857        return tuple(cookies.parse_cookie_headers(h))
 858
 859    def _set_cookies(self, value):
 860        self.headers["cookie"] = cookies.format_cookie_header(value)
 861
 862    @property
 863    def cookies(self) -> multidict.MultiDictView[str, str]:
 864        """
 865        The request cookies.
 866        For the most part, this behaves like a dictionary.
 867        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 868        """
 869        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 870
 871    @cookies.setter
 872    def cookies(self, value):
 873        self._set_cookies(value)
 874
 875    @property
 876    def path_components(self) -> tuple[str, ...]:
 877        """
 878        The URL's path components as a tuple of strings.
 879        Components are unquoted.
 880        """
 881        path = urllib.parse.urlparse(self.url).path
 882        # This needs to be a tuple so that it's immutable.
 883        # Otherwise, this would fail silently:
 884        #   request.path_components.append("foo")
 885        return tuple(url.unquote(i) for i in path.split("/") if i)
 886
 887    @path_components.setter
 888    def path_components(self, components: Iterable[str]):
 889        components = map(lambda x: url.quote(x, safe=""), components)
 890        path = "/" + "/".join(components)
 891        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 892        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 893
 894    def anticache(self) -> None:
 895        """
 896        Modifies this request to remove headers that might produce a cached response.
 897        """
 898        delheaders = (
 899            "if-modified-since",
 900            "if-none-match",
 901        )
 902        for i in delheaders:
 903            self.headers.pop(i, None)
 904
 905    def anticomp(self) -> None:
 906        """
 907        Modify the Accept-Encoding header to only accept uncompressed responses.
 908        """
 909        self.headers["accept-encoding"] = "identity"
 910
 911    def constrain_encoding(self) -> None:
 912        """
 913        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 914        """
 915        accept_encoding = self.headers.get("accept-encoding")
 916        if accept_encoding:
 917            self.headers["accept-encoding"] = ", ".join(
 918                e
 919                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 920                if e in accept_encoding
 921            )
 922
 923    def _get_urlencoded_form(self):
 924        is_valid_content_type = (
 925            "application/x-www-form-urlencoded"
 926            in self.headers.get("content-type", "").lower()
 927        )
 928        if is_valid_content_type:
 929            return tuple(url.decode(self.get_text(strict=False)))
 930        return ()
 931
 932    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 933        """
 934        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 935        This will overwrite the existing content if there is one.
 936        """
 937        self.headers["content-type"] = "application/x-www-form-urlencoded"
 938        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 939
 940    @property
 941    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 942        """
 943        The URL-encoded form data.
 944
 945        If the content-type indicates non-form data or the form could not be parsed, this is set to
 946        an empty `MultiDictView`.
 947
 948        Modifications to the MultiDictView update `Request.content`, and vice versa.
 949        """
 950        return multidict.MultiDictView(
 951            self._get_urlencoded_form, self._set_urlencoded_form
 952        )
 953
 954    @urlencoded_form.setter
 955    def urlencoded_form(self, value):
 956        self._set_urlencoded_form(value)
 957
 958    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 959        is_valid_content_type = (
 960            "multipart/form-data" in self.headers.get("content-type", "").lower()
 961        )
 962        if is_valid_content_type and self.content is not None:
 963            try:
 964                return multipart.decode_multipart(
 965                    self.headers.get("content-type"), self.content
 966                )
 967            except ValueError:
 968                pass
 969        return []
 970
 971    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 972        ct = self.headers.get("content-type", "")
 973        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 974        if not is_valid_content_type:
 975            """
 976            Generate a random boundary here.
 977
 978            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 979            on generating the boundary.
 980            """
 981            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 982            self.headers["content-type"] = ct = (
 983                f"multipart/form-data; boundary={boundary}"
 984            )
 985        self.content = multipart.encode_multipart(ct, value)
 986
 987    @property
 988    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 989        """
 990        The multipart form data.
 991
 992        If the content-type indicates non-form data or the form could not be parsed, this is set to
 993        an empty `MultiDictView`.
 994
 995        Modifications to the MultiDictView update `Request.content`, and vice versa.
 996        """
 997        return multidict.MultiDictView(
 998            self._get_multipart_form, self._set_multipart_form
 999        )
1000
1001    @multipart_form.setter
1002    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1003        self._set_multipart_form(value)
1004
1005
1006class Response(Message):
1007    """
1008    An HTTP response.
1009    """
1010
1011    data: ResponseData
1012
1013    def __init__(
1014        self,
1015        http_version: bytes,
1016        status_code: int,
1017        reason: bytes,
1018        headers: Headers | tuple[tuple[bytes, bytes], ...],
1019        content: bytes | None,
1020        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1021        timestamp_start: float,
1022        timestamp_end: float | None,
1023    ):
1024        # auto-convert invalid types to retain compatibility with older code.
1025        if isinstance(http_version, str):
1026            http_version = http_version.encode("ascii", "strict")
1027        if isinstance(reason, str):
1028            reason = reason.encode("ascii", "strict")
1029
1030        if isinstance(content, str):
1031            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1032        if not isinstance(headers, Headers):
1033            headers = Headers(headers)
1034        if trailers is not None and not isinstance(trailers, Headers):
1035            trailers = Headers(trailers)
1036
1037        self.data = ResponseData(
1038            http_version=http_version,
1039            status_code=status_code,
1040            reason=reason,
1041            headers=headers,
1042            content=content,
1043            trailers=trailers,
1044            timestamp_start=timestamp_start,
1045            timestamp_end=timestamp_end,
1046        )
1047
1048    def __repr__(self) -> str:
1049        if self.raw_content:
1050            ct = self.headers.get("content-type", "unknown content type")
1051            size = human.pretty_size(len(self.raw_content))
1052            details = f"{ct}, {size}"
1053        else:
1054            details = "no content"
1055        return f"Response({self.status_code}, {details})"
1056
1057    @classmethod
1058    def make(
1059        cls,
1060        status_code: int = 200,
1061        content: bytes | str = b"",
1062        headers: (
1063            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1064        ) = (),
1065    ) -> "Response":
1066        """
1067        Simplified API for creating response objects.
1068        """
1069        if isinstance(headers, Headers):
1070            headers = headers
1071        elif isinstance(headers, dict):
1072            headers = Headers(
1073                (
1074                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1075                    always_bytes(v, "utf-8", "surrogateescape"),
1076                )
1077                for k, v in headers.items()
1078            )
1079        elif isinstance(headers, Iterable):
1080            headers = Headers(headers)  # type: ignore
1081        else:
1082            raise TypeError(
1083                "Expected headers to be an iterable or dict, but is {}.".format(
1084                    type(headers).__name__
1085                )
1086            )
1087
1088        resp = cls(
1089            b"HTTP/1.1",
1090            status_code,
1091            status_codes.RESPONSES.get(status_code, "").encode(),
1092            headers,
1093            None,
1094            None,
1095            time.time(),
1096            time.time(),
1097        )
1098
1099        # Assign this manually to update the content-length header.
1100        if isinstance(content, bytes):
1101            resp.content = content
1102        elif isinstance(content, str):
1103            resp.text = content
1104        else:
1105            raise TypeError(
1106                f"Expected content to be str or bytes, but is {type(content).__name__}."
1107            )
1108
1109        return resp
1110
1111    @property
1112    def status_code(self) -> int:
1113        """
1114        HTTP Status Code, e.g. ``200``.
1115        """
1116        return self.data.status_code
1117
1118    @status_code.setter
1119    def status_code(self, status_code: int) -> None:
1120        self.data.status_code = status_code
1121
1122    @property
1123    def reason(self) -> str:
1124        """
1125        HTTP reason phrase, for example "Not Found".
1126
1127        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1128        """
1129        # Encoding: http://stackoverflow.com/a/16674906/934719
1130        return self.data.reason.decode("ISO-8859-1")
1131
1132    @reason.setter
1133    def reason(self, reason: str | bytes) -> None:
1134        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1135
1136    def _get_cookies(self):
1137        h = self.headers.get_all("set-cookie")
1138        all_cookies = cookies.parse_set_cookie_headers(h)
1139        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1140
1141    def _set_cookies(self, value):
1142        cookie_headers = []
1143        for k, v in value:
1144            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1145            cookie_headers.append(header)
1146        self.headers.set_all("set-cookie", cookie_headers)
1147
1148    @property
1149    def cookies(
1150        self,
1151    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1152        """
1153        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1154        name strings, and values are `(cookie value, attributes)` tuples. Within
1155        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1156        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1157
1158        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1159        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1160        """
1161        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1162
1163    @cookies.setter
1164    def cookies(self, value):
1165        self._set_cookies(value)
1166
1167    def refresh(self, now=None):
1168        """
1169        This fairly complex and heuristic function refreshes a server
1170        response for replay.
1171
1172         - It adjusts date, expires, and last-modified headers.
1173         - It adjusts cookie expiration.
1174        """
1175        if not now:
1176            now = time.time()
1177        delta = now - self.timestamp_start
1178        refresh_headers = [
1179            "date",
1180            "expires",
1181            "last-modified",
1182        ]
1183        for i in refresh_headers:
1184            if i in self.headers:
1185                d = parsedate_tz(self.headers[i])
1186                if d:
1187                    new = mktime_tz(d) + delta
1188                    try:
1189                        self.headers[i] = formatdate(new, usegmt=True)
1190                    except OSError:  # pragma: no cover
1191                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1192        c = []
1193        for set_cookie_header in self.headers.get_all("set-cookie"):
1194            try:
1195                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1196            except ValueError:
1197                refreshed = set_cookie_header
1198            c.append(refreshed)
1199        if c:
1200            self.headers.set_all("set-cookie", c)
1201
1202
1203class HTTPFlow(flow.Flow):
1204    """
1205    An HTTPFlow is a collection of objects representing a single HTTP
1206    transaction.
1207    """
1208
1209    request: Request
1210    """The client's HTTP request."""
1211    response: Response | None = None
1212    """The server's HTTP response."""
1213    error: flow.Error | None = None
1214    """
1215    A connection or protocol error affecting this flow.
1216
1217    Note that it's possible for a Flow to have both a response and an error
1218    object. This might happen, for instance, when a response was received
1219    from the server, but there was an error sending it back to the client.
1220    """
1221
1222    websocket: WebSocketData | None = None
1223    """
1224    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1225    """
1226
1227    def get_state(self) -> serializable.State:
1228        return {
1229            **super().get_state(),
1230            "request": self.request.get_state(),
1231            "response": self.response.get_state() if self.response else None,
1232            "websocket": self.websocket.get_state() if self.websocket else None,
1233        }
1234
1235    def set_state(self, state: serializable.State) -> None:
1236        self.request = Request.from_state(state.pop("request"))
1237        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1238        self.websocket = (
1239            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1240        )
1241        super().set_state(state)
1242
1243    def __repr__(self):
1244        s = "<HTTPFlow"
1245        for a in (
1246            "request",
1247            "response",
1248            "websocket",
1249            "error",
1250            "client_conn",
1251            "server_conn",
1252        ):
1253            if getattr(self, a, False):
1254                s += f"\r\n  {a} = {{flow.{a}}}"
1255        s += ">"
1256        return s.format(flow=self)
1257
1258    @property
1259    def timestamp_start(self) -> float:
1260        """*Read-only:* An alias for `Request.timestamp_start`."""
1261        return self.request.timestamp_start
1262
1263    @property
1264    def mode(self) -> str:  # pragma: no cover
1265        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1266        return getattr(self, "_mode", "regular")
1267
1268    @mode.setter
1269    def mode(self, val: str) -> None:  # pragma: no cover
1270        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1271        self._mode = val
1272
1273    def copy(self):
1274        f = super().copy()
1275        if self.request:
1276            f.request = self.request.copy()
1277        if self.response:
1278            f.response = self.response.copy()
1279        return f
1280
1281
1282__all__ = [
1283    "HTTPFlow",
1284    "Message",
1285    "Request",
1286    "Response",
1287    "Headers",
1288]
class HTTPFlow(mitmproxy.flow.Flow):
1204class HTTPFlow(flow.Flow):
1205    """
1206    An HTTPFlow is a collection of objects representing a single HTTP
1207    transaction.
1208    """
1209
1210    request: Request
1211    """The client's HTTP request."""
1212    response: Response | None = None
1213    """The server's HTTP response."""
1214    error: flow.Error | None = None
1215    """
1216    A connection or protocol error affecting this flow.
1217
1218    Note that it's possible for a Flow to have both a response and an error
1219    object. This might happen, for instance, when a response was received
1220    from the server, but there was an error sending it back to the client.
1221    """
1222
1223    websocket: WebSocketData | None = None
1224    """
1225    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1226    """
1227
1228    def get_state(self) -> serializable.State:
1229        return {
1230            **super().get_state(),
1231            "request": self.request.get_state(),
1232            "response": self.response.get_state() if self.response else None,
1233            "websocket": self.websocket.get_state() if self.websocket else None,
1234        }
1235
1236    def set_state(self, state: serializable.State) -> None:
1237        self.request = Request.from_state(state.pop("request"))
1238        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1239        self.websocket = (
1240            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1241        )
1242        super().set_state(state)
1243
1244    def __repr__(self):
1245        s = "<HTTPFlow"
1246        for a in (
1247            "request",
1248            "response",
1249            "websocket",
1250            "error",
1251            "client_conn",
1252            "server_conn",
1253        ):
1254            if getattr(self, a, False):
1255                s += f"\r\n  {a} = {{flow.{a}}}"
1256        s += ">"
1257        return s.format(flow=self)
1258
1259    @property
1260    def timestamp_start(self) -> float:
1261        """*Read-only:* An alias for `Request.timestamp_start`."""
1262        return self.request.timestamp_start
1263
1264    @property
1265    def mode(self) -> str:  # pragma: no cover
1266        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1267        return getattr(self, "_mode", "regular")
1268
1269    @mode.setter
1270    def mode(self, val: str) -> None:  # pragma: no cover
1271        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1272        self._mode = val
1273
1274    def copy(self):
1275        f = super().copy()
1276        if self.request:
1277            f.request = self.request.copy()
1278        if self.response:
1279            f.response = self.response.copy()
1280        return f

An HTTPFlow is a collection of objects representing a single HTTP transaction.

request: Request

The client's HTTP request.

response: Response | None = None

The server's HTTP response.

error: mitmproxy.flow.Error | None = None

A connection or protocol error affecting this flow.

Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.

websocket: mitmproxy.websocket.WebSocketData | None = None

If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.

timestamp_start: float
1259    @property
1260    def timestamp_start(self) -> float:
1261        """*Read-only:* An alias for `Request.timestamp_start`."""
1262        return self.request.timestamp_start

Read-only: An alias for Request.timestamp_start.

mode: str
1264    @property
1265    def mode(self) -> str:  # pragma: no cover
1266        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1267        return getattr(self, "_mode", "regular")
def copy(self):
1274    def copy(self):
1275        f = super().copy()
1276        if self.request:
1277            f.request = self.request.copy()
1278        if self.response:
1279            f.response = self.response.copy()
1280        return f

Make a copy of this flow.

type: ClassVar[str] = 'http'

The flow type, for example http, tcp, or dns.

class Message(mitmproxy.coretypes.serializable.Serializable):
234class Message(serializable.Serializable):
235    """Base class for `Request` and `Response`."""
236
237    @classmethod
238    def from_state(cls, state):
239        return cls(**state)
240
241    def get_state(self):
242        return self.data.get_state()
243
244    def set_state(self, state):
245        self.data.set_state(state)
246
247    data: MessageData
248    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
249    """
250    This attribute controls if the message body should be streamed.
251
252    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
253    This makes it possible to perform string replacements on the entire body.
254    If `True`, the message body will not be buffered on the proxy
255    but immediately forwarded instead.
256    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
257    Please note that packet boundaries generally should not be relied upon.
258
259    This attribute must be set in the `requestheaders` or `responseheaders` hook.
260    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
261    """
262
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")
269
270    @http_version.setter
271    def http_version(self, http_version: str | bytes) -> None:
272        self.data.http_version = strutils.always_bytes(
273            http_version, "utf-8", "surrogateescape"
274        )
275
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
279
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
283
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
287
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
291
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers
298
299    @headers.setter
300    def headers(self, h: Headers) -> None:
301        self.data.headers = h
302
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
309
310    @trailers.setter
311    def trailers(self, h: Headers | None) -> None:
312        self.data.trailers = h
313
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320
321        *See also:* `Message.content`, `Message.text`
322        """
323        return self.data.content
324
325    @raw_content.setter
326    def raw_content(self, content: bytes | None) -> None:
327        self.data.content = content
328
329    @property
330    def content(self) -> bytes | None:
331        """
332        The uncompressed HTTP message body as bytes.
333
334        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
335
336        *See also:* `Message.raw_content`, `Message.text`
337        """
338        return self.get_content()
339
340    @content.setter
341    def content(self, value: bytes | None) -> None:
342        self.set_content(value)
343
344    @property
345    def text(self) -> str | None:
346        """
347        The uncompressed and decoded HTTP message body as text.
348
349        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
350
351        *See also:* `Message.raw_content`, `Message.content`
352        """
353        return self.get_text()
354
355    @text.setter
356    def text(self, value: str | None) -> None:
357        self.set_text(value)
358
359    def set_content(self, value: bytes | None) -> None:
360        if value is None:
361            self.raw_content = None
362            return
363        if not isinstance(value, bytes):
364            raise TypeError(
365                f"Message content must be bytes, not {type(value).__name__}. "
366                "Please use .text if you want to assign a str."
367            )
368        ce = self.headers.get("content-encoding")
369        try:
370            self.raw_content = encoding.encode(value, ce or "identity")
371        except ValueError:
372            # So we have an invalid content-encoding?
373            # Let's remove it!
374            del self.headers["content-encoding"]
375            self.raw_content = value
376
377        if "transfer-encoding" in self.headers:
378            # https://httpwg.org/specs/rfc7230.html#header.content-length
379            # don't set content-length if a transfer-encoding is provided
380            pass
381        else:
382            self.headers["content-length"] = str(len(self.raw_content))
383
384    def get_content(self, strict: bool = True) -> bytes | None:
385        """
386        Similar to `Message.content`, but does not raise if `strict` is `False`.
387        Instead, the compressed message body is returned as-is.
388        """
389        if self.raw_content is None:
390            return None
391        ce = self.headers.get("content-encoding")
392        if ce:
393            try:
394                content = encoding.decode(self.raw_content, ce)
395                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
396                if isinstance(content, str):
397                    raise ValueError(f"Invalid Content-Encoding: {ce}")
398                return content
399            except ValueError:
400                if strict:
401                    raise
402                return self.raw_content
403        else:
404            return self.raw_content
405
406    def set_text(self, text: str | None) -> None:
407        if text is None:
408            self.content = None
409            return
410        enc = infer_content_encoding(self.headers.get("content-type", ""))
411
412        try:
413            self.content = cast(bytes, encoding.encode(text, enc))
414        except ValueError:
415            # Fall back to UTF-8 and update the content-type header.
416            ct = parse_content_type(self.headers.get("content-type", "")) or (
417                "text",
418                "plain",
419                {},
420            )
421            ct[2]["charset"] = "utf-8"
422            self.headers["content-type"] = assemble_content_type(*ct)
423            enc = "utf8"
424            self.content = text.encode(enc, "surrogateescape")
425
426    def get_text(self, strict: bool = True) -> str | None:
427        """
428        Similar to `Message.text`, but does not raise if `strict` is `False`.
429        Instead, the message body is returned as surrogate-escaped UTF-8.
430        """
431        content = self.get_content(strict)
432        if content is None:
433            return None
434        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
435        try:
436            return cast(str, encoding.decode(content, enc))
437        except ValueError:
438            if strict:
439                raise
440            return content.decode("utf8", "surrogateescape")
441
442    @property
443    def timestamp_start(self) -> float:
444        """
445        *Timestamp:* Headers received.
446        """
447        return self.data.timestamp_start
448
449    @timestamp_start.setter
450    def timestamp_start(self, timestamp_start: float) -> None:
451        self.data.timestamp_start = timestamp_start
452
453    @property
454    def timestamp_end(self) -> float | None:
455        """
456        *Timestamp:* Last byte received.
457        """
458        return self.data.timestamp_end
459
460    @timestamp_end.setter
461    def timestamp_end(self, timestamp_end: float | None):
462        self.data.timestamp_end = timestamp_end
463
464    def decode(self, strict: bool = True) -> None:
465        """
466        Decodes body based on the current Content-Encoding header, then
467        removes the header. If there is no Content-Encoding header, no
468        action is taken.
469
470        *Raises:*
471         - `ValueError`, when the content-encoding is invalid and strict is True.
472        """
473        decoded = self.get_content(strict)
474        self.headers.pop("content-encoding", None)
475        self.content = decoded
476
477    def encode(self, encoding: str) -> None:
478        """
479        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
480        Any existing content-encodings are overwritten, the content is not decoded beforehand.
481
482        *Raises:*
483         - `ValueError`, when the specified content-encoding is invalid.
484        """
485        self.headers["content-encoding"] = encoding
486        self.content = self.raw_content
487        if "content-encoding" not in self.headers:
488            raise ValueError(f"Invalid content encoding {repr(encoding)}")
489
490    def json(self, **kwargs: Any) -> Any:
491        """
492        Returns the JSON encoded content of the response, if any.
493        `**kwargs` are optional arguments that will be
494        passed to `json.loads()`.
495
496        Will raise if the content can not be decoded and then parsed as JSON.
497
498        *Raises:*
499         - `json.decoder.JSONDecodeError` if content is not valid JSON.
500         - `TypeError` if the content is not available, for example because the response
501            has been streamed.
502        """
503        content = self.get_content(strict=False)
504        if content is None:
505            raise TypeError("Message content is not available.")
506        else:
507            return json.loads(content, **kwargs)

Base class for Request and Response.

stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False

This attribute controls if the message body should be streamed.

If False, mitmproxy will buffer the entire body before forwarding it to the destination. This makes it possible to perform string replacements on the entire body. If True, the message body will not be buffered on the proxy but immediately forwarded instead. Alternatively, a transformation function can be specified, which will be called for each chunk of data. Please note that packet boundaries generally should not be relied upon.

This attribute must be set in the requestheaders or responseheaders hook. Setting it in request or response is already too late, mitmproxy has buffered the message body already.

http_version: str
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")

HTTP version string, for example HTTP/1.1.

is_http10: bool
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
is_http11: bool
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
is_http2: bool
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
is_http3: bool
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
headers: Headers
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers

The HTTP headers.

trailers: Headers | None
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
raw_content: bytes | None
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320
321        *See also:* `Message.content`, `Message.text`
322        """
323        return self.data.content

The raw (potentially compressed) HTTP message body.

In contrast to Message.content and Message.text, accessing this property never raises.

See also: Message.content, Message.text

content: bytes | None
329    @property
330    def content(self) -> bytes | None:
331        """
332        The uncompressed HTTP message body as bytes.
333
334        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
335
336        *See also:* `Message.raw_content`, `Message.text`
337        """
338        return self.get_content()

The uncompressed HTTP message body as bytes.

Accessing this attribute may raise a ValueError when the HTTP content-encoding is invalid.

See also: Message.raw_content, Message.text

text: str | None
344    @property
345    def text(self) -> str | None:
346        """
347        The uncompressed and decoded HTTP message body as text.
348
349        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
350
351        *See also:* `Message.raw_content`, `Message.content`
352        """
353        return self.get_text()

The uncompressed and decoded HTTP message body as text.

Accessing this attribute may raise a ValueError when either content-encoding or charset is invalid.

See also: Message.raw_content, Message.content

def set_content(self, value: bytes | None) -> None:
359    def set_content(self, value: bytes | None) -> None:
360        if value is None:
361            self.raw_content = None
362            return
363        if not isinstance(value, bytes):
364            raise TypeError(
365                f"Message content must be bytes, not {type(value).__name__}. "
366                "Please use .text if you want to assign a str."
367            )
368        ce = self.headers.get("content-encoding")
369        try:
370            self.raw_content = encoding.encode(value, ce or "identity")
371        except ValueError:
372            # So we have an invalid content-encoding?
373            # Let's remove it!
374            del self.headers["content-encoding"]
375            self.raw_content = value
376
377        if "transfer-encoding" in self.headers:
378            # https://httpwg.org/specs/rfc7230.html#header.content-length
379            # don't set content-length if a transfer-encoding is provided
380            pass
381        else:
382            self.headers["content-length"] = str(len(self.raw_content))
def get_content(self, strict: bool = True) -> bytes | None:
384    def get_content(self, strict: bool = True) -> bytes | None:
385        """
386        Similar to `Message.content`, but does not raise if `strict` is `False`.
387        Instead, the compressed message body is returned as-is.
388        """
389        if self.raw_content is None:
390            return None
391        ce = self.headers.get("content-encoding")
392        if ce:
393            try:
394                content = encoding.decode(self.raw_content, ce)
395                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
396                if isinstance(content, str):
397                    raise ValueError(f"Invalid Content-Encoding: {ce}")
398                return content
399            except ValueError:
400                if strict:
401                    raise
402                return self.raw_content
403        else:
404            return self.raw_content

Similar to Message.content, but does not raise if strict is False. Instead, the compressed message body is returned as-is.

def set_text(self, text: str | None) -> None:
406    def set_text(self, text: str | None) -> None:
407        if text is None:
408            self.content = None
409            return
410        enc = infer_content_encoding(self.headers.get("content-type", ""))
411
412        try:
413            self.content = cast(bytes, encoding.encode(text, enc))
414        except ValueError:
415            # Fall back to UTF-8 and update the content-type header.
416            ct = parse_content_type(self.headers.get("content-type", "")) or (
417                "text",
418                "plain",
419                {},
420            )
421            ct[2]["charset"] = "utf-8"
422            self.headers["content-type"] = assemble_content_type(*ct)
423            enc = "utf8"
424            self.content = text.encode(enc, "surrogateescape")
def get_text(self, strict: bool = True) -> str | None:
426    def get_text(self, strict: bool = True) -> str | None:
427        """
428        Similar to `Message.text`, but does not raise if `strict` is `False`.
429        Instead, the message body is returned as surrogate-escaped UTF-8.
430        """
431        content = self.get_content(strict)
432        if content is None:
433            return None
434        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
435        try:
436            return cast(str, encoding.decode(content, enc))
437        except ValueError:
438            if strict:
439                raise
440            return content.decode("utf8", "surrogateescape")

Similar to Message.text, but does not raise if strict is False. Instead, the message body is returned as surrogate-escaped UTF-8.

timestamp_start: float
442    @property
443    def timestamp_start(self) -> float:
444        """
445        *Timestamp:* Headers received.
446        """
447        return self.data.timestamp_start

Timestamp: Headers received.

timestamp_end: float | None
453    @property
454    def timestamp_end(self) -> float | None:
455        """
456        *Timestamp:* Last byte received.
457        """
458        return self.data.timestamp_end

Timestamp: Last byte received.

def decode(self, strict: bool = True) -> None:
464    def decode(self, strict: bool = True) -> None:
465        """
466        Decodes body based on the current Content-Encoding header, then
467        removes the header. If there is no Content-Encoding header, no
468        action is taken.
469
470        *Raises:*
471         - `ValueError`, when the content-encoding is invalid and strict is True.
472        """
473        decoded = self.get_content(strict)
474        self.headers.pop("content-encoding", None)
475        self.content = decoded

Decodes body based on the current Content-Encoding header, then removes the header. If there is no Content-Encoding header, no action is taken.

Raises:

  • ValueError, when the content-encoding is invalid and strict is True.
def encode(self, encoding: str) -> None:
477    def encode(self, encoding: str) -> None:
478        """
479        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
480        Any existing content-encodings are overwritten, the content is not decoded beforehand.
481
482        *Raises:*
483         - `ValueError`, when the specified content-encoding is invalid.
484        """
485        self.headers["content-encoding"] = encoding
486        self.content = self.raw_content
487        if "content-encoding" not in self.headers:
488            raise ValueError(f"Invalid content encoding {repr(encoding)}")

Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.

Raises:

  • ValueError, when the specified content-encoding is invalid.
def json(self, **kwargs: Any) -> Any:
490    def json(self, **kwargs: Any) -> Any:
491        """
492        Returns the JSON encoded content of the response, if any.
493        `**kwargs` are optional arguments that will be
494        passed to `json.loads()`.
495
496        Will raise if the content can not be decoded and then parsed as JSON.
497
498        *Raises:*
499         - `json.decoder.JSONDecodeError` if content is not valid JSON.
500         - `TypeError` if the content is not available, for example because the response
501            has been streamed.
502        """
503        content = self.get_content(strict=False)
504        if content is None:
505            raise TypeError("Message content is not available.")
506        else:
507            return json.loads(content, **kwargs)

Returns the JSON encoded content of the response, if any. **kwargs are optional arguments that will be passed to json.loads().

Will raise if the content can not be decoded and then parsed as JSON.

Raises:

  • json.decoder.JSONDecodeError if content is not valid JSON.
  • TypeError if the content is not available, for example because the response has been streamed.
class Request(Message):
 510class Request(Message):
 511    """
 512    An HTTP request.
 513    """
 514
 515    data: RequestData
 516
 517    def __init__(
 518        self,
 519        host: str,
 520        port: int,
 521        method: bytes,
 522        scheme: bytes,
 523        authority: bytes,
 524        path: bytes,
 525        http_version: bytes,
 526        headers: Headers | tuple[tuple[bytes, bytes], ...],
 527        content: bytes | None,
 528        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 529        timestamp_start: float,
 530        timestamp_end: float | None,
 531    ):
 532        # auto-convert invalid types to retain compatibility with older code.
 533        if isinstance(host, bytes):
 534            host = host.decode("idna", "strict")
 535        if isinstance(method, str):
 536            method = method.encode("ascii", "strict")
 537        if isinstance(scheme, str):
 538            scheme = scheme.encode("ascii", "strict")
 539        if isinstance(authority, str):
 540            authority = authority.encode("ascii", "strict")
 541        if isinstance(path, str):
 542            path = path.encode("ascii", "strict")
 543        if isinstance(http_version, str):
 544            http_version = http_version.encode("ascii", "strict")
 545
 546        if isinstance(content, str):
 547            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 548        if not isinstance(headers, Headers):
 549            headers = Headers(headers)
 550        if trailers is not None and not isinstance(trailers, Headers):
 551            trailers = Headers(trailers)
 552
 553        self.data = RequestData(
 554            host=host,
 555            port=port,
 556            method=method,
 557            scheme=scheme,
 558            authority=authority,
 559            path=path,
 560            http_version=http_version,
 561            headers=headers,
 562            content=content,
 563            trailers=trailers,
 564            timestamp_start=timestamp_start,
 565            timestamp_end=timestamp_end,
 566        )
 567
 568    def __repr__(self) -> str:
 569        if self.host and self.port:
 570            hostport = f"{self.host}:{self.port}"
 571        else:
 572            hostport = ""
 573        path = self.path or ""
 574        return f"Request({self.method} {hostport}{path})"
 575
 576    @classmethod
 577    def make(
 578        cls,
 579        method: str,
 580        url: str,
 581        content: bytes | str = "",
 582        headers: (
 583            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 584        ) = (),
 585    ) -> "Request":
 586        """
 587        Simplified API for creating request objects.
 588        """
 589        # Headers can be list or dict, we differentiate here.
 590        if isinstance(headers, Headers):
 591            pass
 592        elif isinstance(headers, dict):
 593            headers = Headers(
 594                (
 595                    always_bytes(k, "utf-8", "surrogateescape"),
 596                    always_bytes(v, "utf-8", "surrogateescape"),
 597                )
 598                for k, v in headers.items()
 599            )
 600        elif isinstance(headers, Iterable):
 601            headers = Headers(headers)  # type: ignore
 602        else:
 603            raise TypeError(
 604                "Expected headers to be an iterable or dict, but is {}.".format(
 605                    type(headers).__name__
 606                )
 607            )
 608
 609        req = cls(
 610            "",
 611            0,
 612            method.encode("utf-8", "surrogateescape"),
 613            b"",
 614            b"",
 615            b"",
 616            b"HTTP/1.1",
 617            headers,
 618            b"",
 619            None,
 620            time.time(),
 621            time.time(),
 622        )
 623
 624        req.url = url
 625        # Assign this manually to update the content-length header.
 626        if isinstance(content, bytes):
 627            req.content = content
 628        elif isinstance(content, str):
 629            req.text = content
 630        else:
 631            raise TypeError(
 632                f"Expected content to be str or bytes, but is {type(content).__name__}."
 633            )
 634
 635        return req
 636
 637    @property
 638    def first_line_format(self) -> str:
 639        """
 640        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 641
 642        origin-form and asterisk-form are subsumed as "relative".
 643        """
 644        if self.method == "CONNECT":
 645            return "authority"
 646        elif self.authority:
 647            return "absolute"
 648        else:
 649            return "relative"
 650
 651    @property
 652    def method(self) -> str:
 653        """
 654        HTTP request method, e.g. "GET".
 655        """
 656        return self.data.method.decode("utf-8", "surrogateescape").upper()
 657
 658    @method.setter
 659    def method(self, val: str | bytes) -> None:
 660        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 661
 662    @property
 663    def scheme(self) -> str:
 664        """
 665        HTTP request scheme, which should be "http" or "https".
 666        """
 667        return self.data.scheme.decode("utf-8", "surrogateescape")
 668
 669    @scheme.setter
 670    def scheme(self, val: str | bytes) -> None:
 671        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 672
 673    @property
 674    def authority(self) -> str:
 675        """
 676        HTTP request authority.
 677
 678        For HTTP/1, this is the authority portion of the request target
 679        (in either absolute-form or authority-form).
 680        For origin-form and asterisk-form requests, this property is set to an empty string.
 681
 682        For HTTP/2, this is the :authority pseudo header.
 683
 684        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 685        """
 686        try:
 687            return self.data.authority.decode("idna")
 688        except UnicodeError:
 689            return self.data.authority.decode("utf8", "surrogateescape")
 690
 691    @authority.setter
 692    def authority(self, val: str | bytes) -> None:
 693        if isinstance(val, str):
 694            try:
 695                val = val.encode("idna", "strict")
 696            except UnicodeError:
 697                val = val.encode("utf8", "surrogateescape")  # type: ignore
 698        self.data.authority = val
 699
 700    @property
 701    def host(self) -> str:
 702        """
 703        Target server for this request. This may be parsed from the raw request
 704        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 705        or inferred from the proxy mode (e.g. an IP in transparent mode).
 706
 707        Setting the host attribute also updates the host header and authority information, if present.
 708
 709        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 710        """
 711        return self.data.host
 712
 713    @host.setter
 714    def host(self, val: str | bytes) -> None:
 715        self.data.host = always_str(val, "idna", "strict")
 716        self._update_host_and_authority()
 717
 718    @property
 719    def host_header(self) -> str | None:
 720        """
 721        The request's host/authority header.
 722
 723        This property maps to either ``request.headers["Host"]`` or
 724        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 725
 726        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 727        """
 728        if self.is_http2 or self.is_http3:
 729            return self.authority or self.data.headers.get("Host", None)
 730        else:
 731            return self.data.headers.get("Host", None)
 732
 733    @host_header.setter
 734    def host_header(self, val: None | str | bytes) -> None:
 735        if val is None:
 736            if self.is_http2 or self.is_http3:
 737                self.data.authority = b""
 738            self.headers.pop("Host", None)
 739        else:
 740            if self.is_http2 or self.is_http3:
 741                self.authority = val  # type: ignore
 742            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 743                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 744                self.headers["Host"] = val
 745
 746    @property
 747    def port(self) -> int:
 748        """
 749        Target port.
 750        """
 751        return self.data.port
 752
 753    @port.setter
 754    def port(self, port: int) -> None:
 755        if not isinstance(port, int):
 756            raise ValueError(f"Port must be an integer, not {port!r}.")
 757
 758        self.data.port = port
 759        self._update_host_and_authority()
 760
 761    def _update_host_and_authority(self) -> None:
 762        val = url.hostport(self.scheme, self.host, self.port)
 763
 764        # Update host header
 765        if "Host" in self.data.headers:
 766            self.data.headers["Host"] = val
 767        # Update authority
 768        if self.data.authority:
 769            self.authority = val
 770
 771    @property
 772    def path(self) -> str:
 773        """
 774        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 775        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 776
 777        This attribute includes both path and query parts of the target URI
 778        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 779        """
 780        return self.data.path.decode("utf-8", "surrogateescape")
 781
 782    @path.setter
 783    def path(self, val: str | bytes) -> None:
 784        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 785
 786    @property
 787    def url(self) -> str:
 788        """
 789        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 790
 791        Settings this property updates these attributes as well.
 792        """
 793        if self.first_line_format == "authority":
 794            return f"{self.host}:{self.port}"
 795        return url.unparse(self.scheme, self.host, self.port, self.path)
 796
 797    @url.setter
 798    def url(self, val: str | bytes) -> None:
 799        val = always_str(val, "utf-8", "surrogateescape")
 800        self.scheme, self.host, self.port, self.path = url.parse(val)
 801
 802    @property
 803    def pretty_host(self) -> str:
 804        """
 805        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 806        This is useful in transparent mode where `Request.host` is only an IP address.
 807
 808        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 809        as the Host header could be spoofed.
 810        """
 811        authority = self.host_header
 812        if authority:
 813            return url.parse_authority(authority, check=False)[0]
 814        else:
 815            return self.host
 816
 817    @property
 818    def pretty_url(self) -> str:
 819        """
 820        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 821        """
 822        if self.first_line_format == "authority":
 823            return self.authority
 824
 825        host_header = self.host_header
 826        if not host_header:
 827            return self.url
 828
 829        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 830        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 831
 832        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)
 833
 834    def _get_query(self):
 835        query = urllib.parse.urlparse(self.url).query
 836        return tuple(url.decode(query))
 837
 838    def _set_query(self, query_data):
 839        query = url.encode(query_data)
 840        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 841        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 842
 843    @property
 844    def query(self) -> multidict.MultiDictView[str, str]:
 845        """
 846        The request query as a mutable mapping view on the request's path.
 847        For the most part, this behaves like a dictionary.
 848        Modifications to the MultiDictView update `Request.path`, and vice versa.
 849        """
 850        return multidict.MultiDictView(self._get_query, self._set_query)
 851
 852    @query.setter
 853    def query(self, value):
 854        self._set_query(value)
 855
 856    def _get_cookies(self):
 857        h = self.headers.get_all("Cookie")
 858        return tuple(cookies.parse_cookie_headers(h))
 859
 860    def _set_cookies(self, value):
 861        self.headers["cookie"] = cookies.format_cookie_header(value)
 862
 863    @property
 864    def cookies(self) -> multidict.MultiDictView[str, str]:
 865        """
 866        The request cookies.
 867        For the most part, this behaves like a dictionary.
 868        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 869        """
 870        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 871
 872    @cookies.setter
 873    def cookies(self, value):
 874        self._set_cookies(value)
 875
 876    @property
 877    def path_components(self) -> tuple[str, ...]:
 878        """
 879        The URL's path components as a tuple of strings.
 880        Components are unquoted.
 881        """
 882        path = urllib.parse.urlparse(self.url).path
 883        # This needs to be a tuple so that it's immutable.
 884        # Otherwise, this would fail silently:
 885        #   request.path_components.append("foo")
 886        return tuple(url.unquote(i) for i in path.split("/") if i)
 887
 888    @path_components.setter
 889    def path_components(self, components: Iterable[str]):
 890        components = map(lambda x: url.quote(x, safe=""), components)
 891        path = "/" + "/".join(components)
 892        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 893        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 894
 895    def anticache(self) -> None:
 896        """
 897        Modifies this request to remove headers that might produce a cached response.
 898        """
 899        delheaders = (
 900            "if-modified-since",
 901            "if-none-match",
 902        )
 903        for i in delheaders:
 904            self.headers.pop(i, None)
 905
 906    def anticomp(self) -> None:
 907        """
 908        Modify the Accept-Encoding header to only accept uncompressed responses.
 909        """
 910        self.headers["accept-encoding"] = "identity"
 911
 912    def constrain_encoding(self) -> None:
 913        """
 914        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 915        """
 916        accept_encoding = self.headers.get("accept-encoding")
 917        if accept_encoding:
 918            self.headers["accept-encoding"] = ", ".join(
 919                e
 920                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 921                if e in accept_encoding
 922            )
 923
 924    def _get_urlencoded_form(self):
 925        is_valid_content_type = (
 926            "application/x-www-form-urlencoded"
 927            in self.headers.get("content-type", "").lower()
 928        )
 929        if is_valid_content_type:
 930            return tuple(url.decode(self.get_text(strict=False)))
 931        return ()
 932
 933    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 934        """
 935        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 936        This will overwrite the existing content if there is one.
 937        """
 938        self.headers["content-type"] = "application/x-www-form-urlencoded"
 939        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 940
 941    @property
 942    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 943        """
 944        The URL-encoded form data.
 945
 946        If the content-type indicates non-form data or the form could not be parsed, this is set to
 947        an empty `MultiDictView`.
 948
 949        Modifications to the MultiDictView update `Request.content`, and vice versa.
 950        """
 951        return multidict.MultiDictView(
 952            self._get_urlencoded_form, self._set_urlencoded_form
 953        )
 954
 955    @urlencoded_form.setter
 956    def urlencoded_form(self, value):
 957        self._set_urlencoded_form(value)
 958
 959    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 960        is_valid_content_type = (
 961            "multipart/form-data" in self.headers.get("content-type", "").lower()
 962        )
 963        if is_valid_content_type and self.content is not None:
 964            try:
 965                return multipart.decode_multipart(
 966                    self.headers.get("content-type"), self.content
 967                )
 968            except ValueError:
 969                pass
 970        return []
 971
 972    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 973        ct = self.headers.get("content-type", "")
 974        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 975        if not is_valid_content_type:
 976            """
 977            Generate a random boundary here.
 978
 979            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 980            on generating the boundary.
 981            """
 982            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 983            self.headers["content-type"] = ct = (
 984                f"multipart/form-data; boundary={boundary}"
 985            )
 986        self.content = multipart.encode_multipart(ct, value)
 987
 988    @property
 989    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 990        """
 991        The multipart form data.
 992
 993        If the content-type indicates non-form data or the form could not be parsed, this is set to
 994        an empty `MultiDictView`.
 995
 996        Modifications to the MultiDictView update `Request.content`, and vice versa.
 997        """
 998        return multidict.MultiDictView(
 999            self._get_multipart_form, self._set_multipart_form
1000        )
1001
1002    @multipart_form.setter
1003    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1004        self._set_multipart_form(value)

An HTTP request.

Request( host: str, port: int, method: bytes, scheme: bytes, authority: bytes, path: bytes, http_version: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, timestamp_start: float, timestamp_end: float | None)
517    def __init__(
518        self,
519        host: str,
520        port: int,
521        method: bytes,
522        scheme: bytes,
523        authority: bytes,
524        path: bytes,
525        http_version: bytes,
526        headers: Headers | tuple[tuple[bytes, bytes], ...],
527        content: bytes | None,
528        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
529        timestamp_start: float,
530        timestamp_end: float | None,
531    ):
532        # auto-convert invalid types to retain compatibility with older code.
533        if isinstance(host, bytes):
534            host = host.decode("idna", "strict")
535        if isinstance(method, str):
536            method = method.encode("ascii", "strict")
537        if isinstance(scheme, str):
538            scheme = scheme.encode("ascii", "strict")
539        if isinstance(authority, str):
540            authority = authority.encode("ascii", "strict")
541        if isinstance(path, str):
542            path = path.encode("ascii", "strict")
543        if isinstance(http_version, str):
544            http_version = http_version.encode("ascii", "strict")
545
546        if isinstance(content, str):
547            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
548        if not isinstance(headers, Headers):
549            headers = Headers(headers)
550        if trailers is not None and not isinstance(trailers, Headers):
551            trailers = Headers(trailers)
552
553        self.data = RequestData(
554            host=host,
555            port=port,
556            method=method,
557            scheme=scheme,
558            authority=authority,
559            path=path,
560            http_version=http_version,
561            headers=headers,
562            content=content,
563            trailers=trailers,
564            timestamp_start=timestamp_start,
565            timestamp_end=timestamp_end,
566        )
@classmethod
def make( cls, method: str, url: str, content: bytes | str = '', headers: Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Request:
576    @classmethod
577    def make(
578        cls,
579        method: str,
580        url: str,
581        content: bytes | str = "",
582        headers: (
583            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
584        ) = (),
585    ) -> "Request":
586        """
587        Simplified API for creating request objects.
588        """
589        # Headers can be list or dict, we differentiate here.
590        if isinstance(headers, Headers):
591            pass
592        elif isinstance(headers, dict):
593            headers = Headers(
594                (
595                    always_bytes(k, "utf-8", "surrogateescape"),
596                    always_bytes(v, "utf-8", "surrogateescape"),
597                )
598                for k, v in headers.items()
599            )
600        elif isinstance(headers, Iterable):
601            headers = Headers(headers)  # type: ignore
602        else:
603            raise TypeError(
604                "Expected headers to be an iterable or dict, but is {}.".format(
605                    type(headers).__name__
606                )
607            )
608
609        req = cls(
610            "",
611            0,
612            method.encode("utf-8", "surrogateescape"),
613            b"",
614            b"",
615            b"",
616            b"HTTP/1.1",
617            headers,
618            b"",
619            None,
620            time.time(),
621            time.time(),
622        )
623
624        req.url = url
625        # Assign this manually to update the content-length header.
626        if isinstance(content, bytes):
627            req.content = content
628        elif isinstance(content, str):
629            req.text = content
630        else:
631            raise TypeError(
632                f"Expected content to be str or bytes, but is {type(content).__name__}."
633            )
634
635        return req

Simplified API for creating request objects.

first_line_format: str
637    @property
638    def first_line_format(self) -> str:
639        """
640        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
641
642        origin-form and asterisk-form are subsumed as "relative".
643        """
644        if self.method == "CONNECT":
645            return "authority"
646        elif self.authority:
647            return "absolute"
648        else:
649            return "relative"

Read-only: HTTP request form as defined in RFC 7230.

origin-form and asterisk-form are subsumed as "relative".

method: str
651    @property
652    def method(self) -> str:
653        """
654        HTTP request method, e.g. "GET".
655        """
656        return self.data.method.decode("utf-8", "surrogateescape").upper()

HTTP request method, e.g. "GET".

scheme: str
662    @property
663    def scheme(self) -> str:
664        """
665        HTTP request scheme, which should be "http" or "https".
666        """
667        return self.data.scheme.decode("utf-8", "surrogateescape")

HTTP request scheme, which should be "http" or "https".

authority: str
673    @property
674    def authority(self) -> str:
675        """
676        HTTP request authority.
677
678        For HTTP/1, this is the authority portion of the request target
679        (in either absolute-form or authority-form).
680        For origin-form and asterisk-form requests, this property is set to an empty string.
681
682        For HTTP/2, this is the :authority pseudo header.
683
684        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
685        """
686        try:
687            return self.data.authority.decode("idna")
688        except UnicodeError:
689            return self.data.authority.decode("utf8", "surrogateescape")

HTTP request authority.

For HTTP/1, this is the authority portion of the request target (in either absolute-form or authority-form). For origin-form and asterisk-form requests, this property is set to an empty string.

For HTTP/2, this is the :authority pseudo header.

See also: Request.host, Request.host_header, Request.pretty_host

host: str
700    @property
701    def host(self) -> str:
702        """
703        Target server for this request. This may be parsed from the raw request
704        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
705        or inferred from the proxy mode (e.g. an IP in transparent mode).
706
707        Setting the host attribute also updates the host header and authority information, if present.
708
709        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
710        """
711        return self.data.host

Target server for this request. This may be parsed from the raw request (e.g. from a GET http://example.com/ HTTP/1.1 request line) or inferred from the proxy mode (e.g. an IP in transparent mode).

Setting the host attribute also updates the host header and authority information, if present.

See also: Request.authority, Request.host_header, Request.pretty_host

host_header: str | None
718    @property
719    def host_header(self) -> str | None:
720        """
721        The request's host/authority header.
722
723        This property maps to either ``request.headers["Host"]`` or
724        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
725
726        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
727        """
728        if self.is_http2 or self.is_http3:
729            return self.authority or self.data.headers.get("Host", None)
730        else:
731            return self.data.headers.get("Host", None)

The request's host/authority header.

This property maps to either request.headers["Host"] or request.authority, depending on whether it's HTTP/1.x or HTTP/2.0.

See also: Request.authority,Request.host, Request.pretty_host

port: int
746    @property
747    def port(self) -> int:
748        """
749        Target port.
750        """
751        return self.data.port

Target port.

path: str
771    @property
772    def path(self) -> str:
773        """
774        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
775        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
776
777        This attribute includes both path and query parts of the target URI
778        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
779        """
780        return self.data.path.decode("utf-8", "surrogateescape")

HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".

This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).

url: str
786    @property
787    def url(self) -> str:
788        """
789        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
790
791        Settings this property updates these attributes as well.
792        """
793        if self.first_line_format == "authority":
794            return f"{self.host}:{self.port}"
795        return url.unparse(self.scheme, self.host, self.port, self.path)

The full URL string, constructed from Request.scheme, Request.host, Request.port and Request.path.

Settings this property updates these attributes as well.

pretty_host: str
802    @property
803    def pretty_host(self) -> str:
804        """
805        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
806        This is useful in transparent mode where `Request.host` is only an IP address.
807
808        *Warning:* When working in adversarial environments, this may not reflect the actual destination
809        as the Host header could be spoofed.
810        """
811        authority = self.host_header
812        if authority:
813            return url.parse_authority(authority, check=False)[0]
814        else:
815            return self.host

Read-only: Like Request.host, but using Request.host_header header as an additional (preferred) data source. This is useful in transparent mode where Request.host is only an IP address.

Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.

pretty_url: str
817    @property
818    def pretty_url(self) -> str:
819        """
820        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
821        """
822        if self.first_line_format == "authority":
823            return self.authority
824
825        host_header = self.host_header
826        if not host_header:
827            return self.url
828
829        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
830        pretty_port = pretty_port or url.default_port(self.scheme) or 443
831
832        return url.unparse(self.scheme, pretty_host, pretty_port, self.path)

Read-only: Like Request.url, but using Request.pretty_host instead of Request.host.

query: mitmproxy.coretypes.multidict.MultiDictView[str, str]
843    @property
844    def query(self) -> multidict.MultiDictView[str, str]:
845        """
846        The request query as a mutable mapping view on the request's path.
847        For the most part, this behaves like a dictionary.
848        Modifications to the MultiDictView update `Request.path`, and vice versa.
849        """
850        return multidict.MultiDictView(self._get_query, self._set_query)

The request query as a mutable mapping view on the request's path. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.path, and vice versa.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, str]
863    @property
864    def cookies(self) -> multidict.MultiDictView[str, str]:
865        """
866        The request cookies.
867        For the most part, this behaves like a dictionary.
868        Modifications to the MultiDictView update `Request.headers`, and vice versa.
869        """
870        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The request cookies. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.headers, and vice versa.

path_components: tuple[str, ...]
876    @property
877    def path_components(self) -> tuple[str, ...]:
878        """
879        The URL's path components as a tuple of strings.
880        Components are unquoted.
881        """
882        path = urllib.parse.urlparse(self.url).path
883        # This needs to be a tuple so that it's immutable.
884        # Otherwise, this would fail silently:
885        #   request.path_components.append("foo")
886        return tuple(url.unquote(i) for i in path.split("/") if i)

The URL's path components as a tuple of strings. Components are unquoted.

def anticache(self) -> None:
895    def anticache(self) -> None:
896        """
897        Modifies this request to remove headers that might produce a cached response.
898        """
899        delheaders = (
900            "if-modified-since",
901            "if-none-match",
902        )
903        for i in delheaders:
904            self.headers.pop(i, None)

Modifies this request to remove headers that might produce a cached response.

def anticomp(self) -> None:
906    def anticomp(self) -> None:
907        """
908        Modify the Accept-Encoding header to only accept uncompressed responses.
909        """
910        self.headers["accept-encoding"] = "identity"

Modify the Accept-Encoding header to only accept uncompressed responses.

def constrain_encoding(self) -> None:
912    def constrain_encoding(self) -> None:
913        """
914        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
915        """
916        accept_encoding = self.headers.get("accept-encoding")
917        if accept_encoding:
918            self.headers["accept-encoding"] = ", ".join(
919                e
920                for e in {"gzip", "identity", "deflate", "br", "zstd"}
921                if e in accept_encoding
922            )

Limits the permissible Accept-Encoding values, based on what we can decode appropriately.

urlencoded_form: mitmproxy.coretypes.multidict.MultiDictView[str, str]
941    @property
942    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
943        """
944        The URL-encoded form data.
945
946        If the content-type indicates non-form data or the form could not be parsed, this is set to
947        an empty `MultiDictView`.
948
949        Modifications to the MultiDictView update `Request.content`, and vice versa.
950        """
951        return multidict.MultiDictView(
952            self._get_urlencoded_form, self._set_urlencoded_form
953        )

The URL-encoded form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

multipart_form: mitmproxy.coretypes.multidict.MultiDictView[bytes, bytes]
 988    @property
 989    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 990        """
 991        The multipart form data.
 992
 993        If the content-type indicates non-form data or the form could not be parsed, this is set to
 994        an empty `MultiDictView`.
 995
 996        Modifications to the MultiDictView update `Request.content`, and vice versa.
 997        """
 998        return multidict.MultiDictView(
 999            self._get_multipart_form, self._set_multipart_form
1000        )

The multipart form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

class Response(Message):
1007class Response(Message):
1008    """
1009    An HTTP response.
1010    """
1011
1012    data: ResponseData
1013
1014    def __init__(
1015        self,
1016        http_version: bytes,
1017        status_code: int,
1018        reason: bytes,
1019        headers: Headers | tuple[tuple[bytes, bytes], ...],
1020        content: bytes | None,
1021        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1022        timestamp_start: float,
1023        timestamp_end: float | None,
1024    ):
1025        # auto-convert invalid types to retain compatibility with older code.
1026        if isinstance(http_version, str):
1027            http_version = http_version.encode("ascii", "strict")
1028        if isinstance(reason, str):
1029            reason = reason.encode("ascii", "strict")
1030
1031        if isinstance(content, str):
1032            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1033        if not isinstance(headers, Headers):
1034            headers = Headers(headers)
1035        if trailers is not None and not isinstance(trailers, Headers):
1036            trailers = Headers(trailers)
1037
1038        self.data = ResponseData(
1039            http_version=http_version,
1040            status_code=status_code,
1041            reason=reason,
1042            headers=headers,
1043            content=content,
1044            trailers=trailers,
1045            timestamp_start=timestamp_start,
1046            timestamp_end=timestamp_end,
1047        )
1048
1049    def __repr__(self) -> str:
1050        if self.raw_content:
1051            ct = self.headers.get("content-type", "unknown content type")
1052            size = human.pretty_size(len(self.raw_content))
1053            details = f"{ct}, {size}"
1054        else:
1055            details = "no content"
1056        return f"Response({self.status_code}, {details})"
1057
1058    @classmethod
1059    def make(
1060        cls,
1061        status_code: int = 200,
1062        content: bytes | str = b"",
1063        headers: (
1064            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1065        ) = (),
1066    ) -> "Response":
1067        """
1068        Simplified API for creating response objects.
1069        """
1070        if isinstance(headers, Headers):
1071            headers = headers
1072        elif isinstance(headers, dict):
1073            headers = Headers(
1074                (
1075                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1076                    always_bytes(v, "utf-8", "surrogateescape"),
1077                )
1078                for k, v in headers.items()
1079            )
1080        elif isinstance(headers, Iterable):
1081            headers = Headers(headers)  # type: ignore
1082        else:
1083            raise TypeError(
1084                "Expected headers to be an iterable or dict, but is {}.".format(
1085                    type(headers).__name__
1086                )
1087            )
1088
1089        resp = cls(
1090            b"HTTP/1.1",
1091            status_code,
1092            status_codes.RESPONSES.get(status_code, "").encode(),
1093            headers,
1094            None,
1095            None,
1096            time.time(),
1097            time.time(),
1098        )
1099
1100        # Assign this manually to update the content-length header.
1101        if isinstance(content, bytes):
1102            resp.content = content
1103        elif isinstance(content, str):
1104            resp.text = content
1105        else:
1106            raise TypeError(
1107                f"Expected content to be str or bytes, but is {type(content).__name__}."
1108            )
1109
1110        return resp
1111
1112    @property
1113    def status_code(self) -> int:
1114        """
1115        HTTP Status Code, e.g. ``200``.
1116        """
1117        return self.data.status_code
1118
1119    @status_code.setter
1120    def status_code(self, status_code: int) -> None:
1121        self.data.status_code = status_code
1122
1123    @property
1124    def reason(self) -> str:
1125        """
1126        HTTP reason phrase, for example "Not Found".
1127
1128        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1129        """
1130        # Encoding: http://stackoverflow.com/a/16674906/934719
1131        return self.data.reason.decode("ISO-8859-1")
1132
1133    @reason.setter
1134    def reason(self, reason: str | bytes) -> None:
1135        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1136
1137    def _get_cookies(self):
1138        h = self.headers.get_all("set-cookie")
1139        all_cookies = cookies.parse_set_cookie_headers(h)
1140        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1141
1142    def _set_cookies(self, value):
1143        cookie_headers = []
1144        for k, v in value:
1145            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1146            cookie_headers.append(header)
1147        self.headers.set_all("set-cookie", cookie_headers)
1148
1149    @property
1150    def cookies(
1151        self,
1152    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1153        """
1154        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1155        name strings, and values are `(cookie value, attributes)` tuples. Within
1156        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1157        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1158
1159        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1160        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1161        """
1162        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1163
1164    @cookies.setter
1165    def cookies(self, value):
1166        self._set_cookies(value)
1167
1168    def refresh(self, now=None):
1169        """
1170        This fairly complex and heuristic function refreshes a server
1171        response for replay.
1172
1173         - It adjusts date, expires, and last-modified headers.
1174         - It adjusts cookie expiration.
1175        """
1176        if not now:
1177            now = time.time()
1178        delta = now - self.timestamp_start
1179        refresh_headers = [
1180            "date",
1181            "expires",
1182            "last-modified",
1183        ]
1184        for i in refresh_headers:
1185            if i in self.headers:
1186                d = parsedate_tz(self.headers[i])
1187                if d:
1188                    new = mktime_tz(d) + delta
1189                    try:
1190                        self.headers[i] = formatdate(new, usegmt=True)
1191                    except OSError:  # pragma: no cover
1192                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1193        c = []
1194        for set_cookie_header in self.headers.get_all("set-cookie"):
1195            try:
1196                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1197            except ValueError:
1198                refreshed = set_cookie_header
1199            c.append(refreshed)
1200        if c:
1201            self.headers.set_all("set-cookie", c)

An HTTP response.

Response( http_version: bytes, status_code: int, reason: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: None | Headers | tuple[tuple[bytes, bytes], ...], timestamp_start: float, timestamp_end: float | None)
1014    def __init__(
1015        self,
1016        http_version: bytes,
1017        status_code: int,
1018        reason: bytes,
1019        headers: Headers | tuple[tuple[bytes, bytes], ...],
1020        content: bytes | None,
1021        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1022        timestamp_start: float,
1023        timestamp_end: float | None,
1024    ):
1025        # auto-convert invalid types to retain compatibility with older code.
1026        if isinstance(http_version, str):
1027            http_version = http_version.encode("ascii", "strict")
1028        if isinstance(reason, str):
1029            reason = reason.encode("ascii", "strict")
1030
1031        if isinstance(content, str):
1032            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1033        if not isinstance(headers, Headers):
1034            headers = Headers(headers)
1035        if trailers is not None and not isinstance(trailers, Headers):
1036            trailers = Headers(trailers)
1037
1038        self.data = ResponseData(
1039            http_version=http_version,
1040            status_code=status_code,
1041            reason=reason,
1042            headers=headers,
1043            content=content,
1044            trailers=trailers,
1045            timestamp_start=timestamp_start,
1046            timestamp_end=timestamp_end,
1047        )
@classmethod
def make( cls, status_code: int = 200, content: bytes | str = b'', headers: Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Response:
1058    @classmethod
1059    def make(
1060        cls,
1061        status_code: int = 200,
1062        content: bytes | str = b"",
1063        headers: (
1064            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1065        ) = (),
1066    ) -> "Response":
1067        """
1068        Simplified API for creating response objects.
1069        """
1070        if isinstance(headers, Headers):
1071            headers = headers
1072        elif isinstance(headers, dict):
1073            headers = Headers(
1074                (
1075                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1076                    always_bytes(v, "utf-8", "surrogateescape"),
1077                )
1078                for k, v in headers.items()
1079            )
1080        elif isinstance(headers, Iterable):
1081            headers = Headers(headers)  # type: ignore
1082        else:
1083            raise TypeError(
1084                "Expected headers to be an iterable or dict, but is {}.".format(
1085                    type(headers).__name__
1086                )
1087            )
1088
1089        resp = cls(
1090            b"HTTP/1.1",
1091            status_code,
1092            status_codes.RESPONSES.get(status_code, "").encode(),
1093            headers,
1094            None,
1095            None,
1096            time.time(),
1097            time.time(),
1098        )
1099
1100        # Assign this manually to update the content-length header.
1101        if isinstance(content, bytes):
1102            resp.content = content
1103        elif isinstance(content, str):
1104            resp.text = content
1105        else:
1106            raise TypeError(
1107                f"Expected content to be str or bytes, but is {type(content).__name__}."
1108            )
1109
1110        return resp

Simplified API for creating response objects.

status_code: int
1112    @property
1113    def status_code(self) -> int:
1114        """
1115        HTTP Status Code, e.g. ``200``.
1116        """
1117        return self.data.status_code

HTTP Status Code, e.g. 200.

reason: str
1123    @property
1124    def reason(self) -> str:
1125        """
1126        HTTP reason phrase, for example "Not Found".
1127
1128        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1129        """
1130        # Encoding: http://stackoverflow.com/a/16674906/934719
1131        return self.data.reason.decode("ISO-8859-1")

HTTP reason phrase, for example "Not Found".

HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, tuple[str, mitmproxy.coretypes.multidict.MultiDict[str, str | None]]]
1149    @property
1150    def cookies(
1151        self,
1152    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1153        """
1154        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1155        name strings, and values are `(cookie value, attributes)` tuples. Within
1156        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1157        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1158
1159        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1160        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1161        """
1162        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The response cookies. A possibly empty MultiDictView, where the keys are cookie name strings, and values are (cookie value, attributes) tuples. Within attributes, unary attributes (e.g. HTTPOnly) are indicated by a None value. Modifications to the MultiDictView update Response.headers, and vice versa.

Warning: Changes to attributes will not be picked up unless you also reassign the (cookie value, attributes) tuple directly in the MultiDictView.

def refresh(self, now=None):
1168    def refresh(self, now=None):
1169        """
1170        This fairly complex and heuristic function refreshes a server
1171        response for replay.
1172
1173         - It adjusts date, expires, and last-modified headers.
1174         - It adjusts cookie expiration.
1175        """
1176        if not now:
1177            now = time.time()
1178        delta = now - self.timestamp_start
1179        refresh_headers = [
1180            "date",
1181            "expires",
1182            "last-modified",
1183        ]
1184        for i in refresh_headers:
1185            if i in self.headers:
1186                d = parsedate_tz(self.headers[i])
1187                if d:
1188                    new = mktime_tz(d) + delta
1189                    try:
1190                        self.headers[i] = formatdate(new, usegmt=True)
1191                    except OSError:  # pragma: no cover
1192                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1193        c = []
1194        for set_cookie_header in self.headers.get_all("set-cookie"):
1195            try:
1196                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1197            except ValueError:
1198                refreshed = set_cookie_header
1199            c.append(refreshed)
1200        if c:
1201            self.headers.set_all("set-cookie", c)

This fairly complex and heuristic function refreshes a server response for replay.

  • It adjusts date, expires, and last-modified headers.
  • It adjusts cookie expiration.
class Headers(mitmproxy.coretypes.multidict._MultiDict[~KT, ~VT], mitmproxy.coretypes.serializable.Serializable):
 50class Headers(multidict.MultiDict):  # type: ignore
 51    """
 52    Header class which allows both convenient access to individual headers as well as
 53    direct access to the underlying raw data. Provides a full dictionary interface.
 54
 55    Create headers with keyword arguments:
 56    >>> h = Headers(host="example.com", content_type="application/xml")
 57
 58    Headers mostly behave like a normal dict:
 59    >>> h["Host"]
 60    "example.com"
 61
 62    Headers are case insensitive:
 63    >>> h["host"]
 64    "example.com"
 65
 66    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
 67    >>> h = Headers([
 68        (b"Host",b"example.com"),
 69        (b"Accept",b"text/html"),
 70        (b"accept",b"application/xml")
 71    ])
 72
 73    Multiple headers are folded into a single header as per RFC 7230:
 74    >>> h["Accept"]
 75    "text/html, application/xml"
 76
 77    Setting a header removes all existing headers with the same name:
 78    >>> h["Accept"] = "application/text"
 79    >>> h["Accept"]
 80    "application/text"
 81
 82    `bytes(h)` returns an HTTP/1 header block:
 83    >>> print(bytes(h))
 84    Host: example.com
 85    Accept: application/text
 86
 87    For full control, the raw header fields can be accessed:
 88    >>> h.fields
 89
 90    Caveats:
 91     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
 92    """
 93
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )
119
120    fields: tuple[tuple[bytes, bytes], ...]
121
122    @staticmethod
123    def _reduce_values(values) -> str:
124        # Headers can be folded
125        return ", ".join(values)
126
127    @staticmethod
128    def _kconv(key) -> str:
129        # Headers are case-insensitive
130        return key.lower()
131
132    def __bytes__(self) -> bytes:
133        if self.fields:
134            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
135        else:
136            return b""
137
138    def __delitem__(self, key: str | bytes) -> None:
139        key = _always_bytes(key)
140        super().__delitem__(key)
141
142    def __iter__(self) -> Iterator[str]:
143        for x in super().__iter__():
144            yield _native(x)
145
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]
158
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)
167
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)
172
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.

Create headers with keyword arguments:

>>> h = Headers(host="example.com", content_type="application/xml")

Headers mostly behave like a normal dict:

>>> h["Host"]
"example.com"

Headers are case insensitive:

>>> h["host"]
"example.com"

Headers can also be created from a list of raw (header_name, header_value) byte tuples:

>>> h = Headers([
    (b"Host",b"example.com"),
    (b"Accept",b"text/html"),
    (b"accept",b"application/xml")
])

Multiple headers are folded into a single header as per RFC 7230:

>>> h["Accept"]
"text/html, application/xml"

Setting a header removes all existing headers with the same name:

>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"

bytes(h) returns an HTTP/1 header block:

>>> print(bytes(h))
Host: example.com
Accept: application/text

For full control, the raw header fields can be accessed:

>>> h.fields

Caveats:

Headers(fields: Iterable[tuple[bytes, bytes]] = (), **headers)
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )

Args:

  • fields: (optional) list of (name, value) header byte tuples, e.g. [(b"Host", b"example.com")]. All names and values must be bytes.
  • **headers: Additional headers to set. Will overwrite existing values from fields. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.

If **headers contains multiple keys that have equal .lower() representations, the behavior is undefined.

fields: tuple[tuple[bytes, bytes], ...]

The underlying raw datastructure.

def get_all(self, name: str | bytes) -> list[str]:
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]

Like Headers.get, but does not fold multiple headers into a single one. This is useful for Set-Cookie and Cookie headers, which do not support folding.

See also:

def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)

Explicitly set multiple headers for the given key. See Headers.get_all.

def insert(self, index: int, key: str | bytes, value: str | bytes):
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)

Insert an additional value for the given key at the specified position.

def items(self, multi=False):
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Get all (key, value) tuples.

If multi is True, all (key, value) pairs will be returned. If False, only one tuple per key is returned.