Edit on GitHub

mitmproxy.http

   1import binascii
   2import json
   3import os
   4import time
   5import urllib.parse
   6import warnings
   7from collections.abc import Callable
   8from collections.abc import Iterable
   9from collections.abc import Iterator
  10from collections.abc import Mapping
  11from collections.abc import Sequence
  12from dataclasses import dataclass
  13from dataclasses import fields
  14from email.utils import formatdate
  15from email.utils import mktime_tz
  16from email.utils import parsedate_tz
  17from typing import Any
  18from typing import cast
  19
  20from mitmproxy import flow
  21from mitmproxy.coretypes import multidict
  22from mitmproxy.coretypes import serializable
  23from mitmproxy.net import encoding
  24from mitmproxy.net.http import cookies
  25from mitmproxy.net.http import multipart
  26from mitmproxy.net.http import status_codes
  27from mitmproxy.net.http import url
  28from mitmproxy.net.http.headers import assemble_content_type
  29from mitmproxy.net.http.headers import infer_content_encoding
  30from mitmproxy.net.http.headers import parse_content_type
  31from mitmproxy.utils import human
  32from mitmproxy.utils import strutils
  33from mitmproxy.utils import typecheck
  34from mitmproxy.utils.strutils import always_bytes
  35from mitmproxy.utils.strutils import always_str
  36from mitmproxy.websocket import WebSocketData
  37
  38
  39# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
  40def _native(x: bytes) -> str:
  41    return x.decode("utf-8", "surrogateescape")
  42
  43
  44def _always_bytes(x: str | bytes) -> bytes:
  45    return strutils.always_bytes(x, "utf-8", "surrogateescape")
  46
  47
  48# This cannot be easily typed with mypy yet, so we just specify MultiDict without concrete types.
  49class Headers(multidict.MultiDict):  # type: ignore
  50    """
  51    Header class which allows both convenient access to individual headers as well as
  52    direct access to the underlying raw data. Provides a full dictionary interface.
  53
  54    Create headers with keyword arguments:
  55    >>> h = Headers(host="example.com", content_type="application/xml")
  56
  57    Headers mostly behave like a normal dict:
  58    >>> h["Host"]
  59    "example.com"
  60
  61    Headers are case insensitive:
  62    >>> h["host"]
  63    "example.com"
  64
  65    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
  66    >>> h = Headers([
  67        (b"Host",b"example.com"),
  68        (b"Accept",b"text/html"),
  69        (b"accept",b"application/xml")
  70    ])
  71
  72    Multiple headers are folded into a single header as per RFC 7230:
  73    >>> h["Accept"]
  74    "text/html, application/xml"
  75
  76    Setting a header removes all existing headers with the same name:
  77    >>> h["Accept"] = "application/text"
  78    >>> h["Accept"]
  79    "application/text"
  80
  81    `bytes(h)` returns an HTTP/1 header block:
  82    >>> print(bytes(h))
  83    Host: example.com
  84    Accept: application/text
  85
  86    For full control, the raw header fields can be accessed:
  87    >>> h.fields
  88
  89    Caveats:
  90     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
  91    """
  92
  93    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
  94        """
  95        *Args:*
  96         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
  97           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
  98         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
  99           For convenience, underscores in header names will be transformed to dashes -
 100           this behaviour does not extend to other methods.
 101
 102        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
 103        the behavior is undefined.
 104        """
 105        super().__init__(fields)
 106
 107        for key, value in self.fields:
 108            if not isinstance(key, bytes) or not isinstance(value, bytes):
 109                raise TypeError("Header fields must be bytes.")
 110
 111        # content_type -> content-type
 112        self.update(
 113            {
 114                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
 115                for name, value in headers.items()
 116            }
 117        )
 118
 119    fields: tuple[tuple[bytes, bytes], ...]
 120
 121    @staticmethod
 122    def _reduce_values(values) -> str:
 123        # Headers can be folded
 124        return ", ".join(values)
 125
 126    @staticmethod
 127    def _kconv(key) -> str:
 128        # Headers are case-insensitive
 129        return key.lower()
 130
 131    def __bytes__(self) -> bytes:
 132        if self.fields:
 133            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
 134        else:
 135            return b""
 136
 137    def __delitem__(self, key: str | bytes) -> None:
 138        key = _always_bytes(key)
 139        super().__delitem__(key)
 140
 141    def __iter__(self) -> Iterator[str]:
 142        for x in super().__iter__():
 143            yield _native(x)
 144
 145    def get_all(self, name: str | bytes) -> list[str]:
 146        """
 147        Like `Headers.get`, but does not fold multiple headers into a single one.
 148        This is useful for Set-Cookie and Cookie headers, which do not support folding.
 149
 150        *See also:*
 151         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
 152         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
 153         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
 154        """
 155        name = _always_bytes(name)
 156        return [_native(x) for x in super().get_all(name)]
 157
 158    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
 159        """
 160        Explicitly set multiple headers for the given key.
 161        See `Headers.get_all`.
 162        """
 163        name = _always_bytes(name)
 164        values = [_always_bytes(x) for x in values]
 165        return super().set_all(name, values)
 166
 167    def insert(self, index: int, key: str | bytes, value: str | bytes):
 168        key = _always_bytes(key)
 169        value = _always_bytes(value)
 170        super().insert(index, key, value)
 171
 172    def items(self, multi=False):
 173        if multi:
 174            return ((_native(k), _native(v)) for k, v in self.fields)
 175        else:
 176            return super().items()
 177
 178
 179@dataclass
 180class MessageData(serializable.Serializable):
 181    http_version: bytes
 182    headers: Headers
 183    content: bytes | None
 184    trailers: Headers | None
 185    timestamp_start: float
 186    timestamp_end: float | None
 187
 188    # noinspection PyUnreachableCode
 189    if __debug__:
 190
 191        def __post_init__(self):
 192            for field in fields(self):
 193                val = getattr(self, field.name)
 194                typecheck.check_option_type(field.name, val, field.type)
 195
 196    def set_state(self, state):
 197        for k, v in state.items():
 198            if k in ("headers", "trailers") and v is not None:
 199                v = Headers.from_state(v)
 200            setattr(self, k, v)
 201
 202    def get_state(self):
 203        state = vars(self).copy()
 204        state["headers"] = state["headers"].get_state()
 205        if state["trailers"] is not None:
 206            state["trailers"] = state["trailers"].get_state()
 207        return state
 208
 209    @classmethod
 210    def from_state(cls, state):
 211        state["headers"] = Headers.from_state(state["headers"])
 212        if state["trailers"] is not None:
 213            state["trailers"] = Headers.from_state(state["trailers"])
 214        return cls(**state)
 215
 216
 217@dataclass
 218class RequestData(MessageData):
 219    host: str
 220    port: int
 221    method: bytes
 222    scheme: bytes
 223    authority: bytes
 224    path: bytes
 225
 226
 227@dataclass
 228class ResponseData(MessageData):
 229    status_code: int
 230    reason: bytes
 231
 232
 233class Message(serializable.Serializable):
 234    """Base class for `Request` and `Response`."""
 235
 236    @classmethod
 237    def from_state(cls, state):
 238        return cls(**state)
 239
 240    def get_state(self):
 241        return self.data.get_state()
 242
 243    def set_state(self, state):
 244        self.data.set_state(state)
 245
 246    data: MessageData
 247    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
 248    """
 249    This attribute controls if the message body should be streamed.
 250
 251    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
 252    This makes it possible to perform string replacements on the entire body.
 253    If `True`, the message body will not be buffered on the proxy
 254    but immediately forwarded instead.
 255    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
 256    Please note that packet boundaries generally should not be relied upon.
 257
 258    This attribute must be set in the `requestheaders` or `responseheaders` hook.
 259    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
 260    """
 261
 262    @property
 263    def http_version(self) -> str:
 264        """
 265        HTTP version string, for example `HTTP/1.1`.
 266        """
 267        return self.data.http_version.decode("utf-8", "surrogateescape")
 268
 269    @http_version.setter
 270    def http_version(self, http_version: str | bytes) -> None:
 271        self.data.http_version = strutils.always_bytes(
 272            http_version, "utf-8", "surrogateescape"
 273        )
 274
 275    @property
 276    def is_http10(self) -> bool:
 277        return self.data.http_version == b"HTTP/1.0"
 278
 279    @property
 280    def is_http11(self) -> bool:
 281        return self.data.http_version == b"HTTP/1.1"
 282
 283    @property
 284    def is_http2(self) -> bool:
 285        return self.data.http_version == b"HTTP/2.0"
 286
 287    @property
 288    def is_http3(self) -> bool:
 289        return self.data.http_version == b"HTTP/3"
 290
 291    @property
 292    def headers(self) -> Headers:
 293        """
 294        The HTTP headers.
 295        """
 296        return self.data.headers
 297
 298    @headers.setter
 299    def headers(self, h: Headers) -> None:
 300        self.data.headers = h
 301
 302    @property
 303    def trailers(self) -> Headers | None:
 304        """
 305        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
 306        """
 307        return self.data.trailers
 308
 309    @trailers.setter
 310    def trailers(self, h: Headers | None) -> None:
 311        self.data.trailers = h
 312
 313    @property
 314    def raw_content(self) -> bytes | None:
 315        """
 316        The raw (potentially compressed) HTTP message body.
 317
 318        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
 319        `raw_content` may be `None` if the content is missing, for example due to body streaming
 320        (see `Message.stream`). In contrast, `b""` signals a present but empty message body.
 321
 322        *See also:* `Message.content`, `Message.text`
 323        """
 324        return self.data.content
 325
 326    @raw_content.setter
 327    def raw_content(self, content: bytes | None) -> None:
 328        self.data.content = content
 329
 330    @property
 331    def content(self) -> bytes | None:
 332        """
 333        The uncompressed HTTP message body as bytes.
 334
 335        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
 336
 337        *See also:* `Message.raw_content`, `Message.text`
 338        """
 339        return self.get_content()
 340
 341    @content.setter
 342    def content(self, value: bytes | None) -> None:
 343        self.set_content(value)
 344
 345    @property
 346    def text(self) -> str | None:
 347        """
 348        The uncompressed and decoded HTTP message body as text.
 349
 350        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
 351
 352        *See also:* `Message.raw_content`, `Message.content`
 353        """
 354        return self.get_text()
 355
 356    @text.setter
 357    def text(self, value: str | None) -> None:
 358        self.set_text(value)
 359
 360    def set_content(self, value: bytes | None) -> None:
 361        if value is None:
 362            self.raw_content = None
 363            return
 364        if not isinstance(value, bytes):
 365            raise TypeError(
 366                f"Message content must be bytes, not {type(value).__name__}. "
 367                "Please use .text if you want to assign a str."
 368            )
 369        ce = self.headers.get("content-encoding")
 370        try:
 371            self.raw_content = encoding.encode(value, ce or "identity")
 372        except ValueError:
 373            # So we have an invalid content-encoding?
 374            # Let's remove it!
 375            del self.headers["content-encoding"]
 376            self.raw_content = value
 377
 378        if "transfer-encoding" in self.headers:
 379            # https://httpwg.org/specs/rfc7230.html#header.content-length
 380            # don't set content-length if a transfer-encoding is provided
 381            pass
 382        else:
 383            self.headers["content-length"] = str(len(self.raw_content))
 384
 385    def get_content(self, strict: bool = True) -> bytes | None:
 386        """
 387        Similar to `Message.content`, but does not raise if `strict` is `False`.
 388        Instead, the compressed message body is returned as-is.
 389        """
 390        if self.raw_content is None:
 391            return None
 392        ce = self.headers.get("content-encoding")
 393        if ce:
 394            try:
 395                content = encoding.decode(self.raw_content, ce)
 396                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
 397                if isinstance(content, str):
 398                    raise ValueError(f"Invalid Content-Encoding: {ce}")
 399                return content
 400            except ValueError:
 401                if strict:
 402                    raise
 403                return self.raw_content
 404        else:
 405            return self.raw_content
 406
 407    def set_text(self, text: str | None) -> None:
 408        if text is None:
 409            self.content = None
 410            return
 411        enc = infer_content_encoding(self.headers.get("content-type", ""))
 412
 413        try:
 414            self.content = cast(bytes, encoding.encode(text, enc))
 415        except ValueError:
 416            # Fall back to UTF-8 and update the content-type header.
 417            ct = parse_content_type(self.headers.get("content-type", "")) or (
 418                "text",
 419                "plain",
 420                {},
 421            )
 422            ct[2]["charset"] = "utf-8"
 423            self.headers["content-type"] = assemble_content_type(*ct)
 424            enc = "utf8"
 425            self.content = text.encode(enc, "surrogateescape")
 426
 427    def get_text(self, strict: bool = True) -> str | None:
 428        """
 429        Similar to `Message.text`, but does not raise if `strict` is `False`.
 430        Instead, the message body is returned as surrogate-escaped UTF-8.
 431        """
 432        content = self.get_content(strict)
 433        if content is None:
 434            return None
 435        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
 436        try:
 437            return cast(str, encoding.decode(content, enc))
 438        except ValueError:
 439            if strict:
 440                raise
 441            return content.decode("utf8", "surrogateescape")
 442
 443    @property
 444    def timestamp_start(self) -> float:
 445        """
 446        *Timestamp:* Headers received.
 447        """
 448        return self.data.timestamp_start
 449
 450    @timestamp_start.setter
 451    def timestamp_start(self, timestamp_start: float) -> None:
 452        self.data.timestamp_start = timestamp_start
 453
 454    @property
 455    def timestamp_end(self) -> float | None:
 456        """
 457        *Timestamp:* Last byte received.
 458        """
 459        return self.data.timestamp_end
 460
 461    @timestamp_end.setter
 462    def timestamp_end(self, timestamp_end: float | None):
 463        self.data.timestamp_end = timestamp_end
 464
 465    def decode(self, strict: bool = True) -> None:
 466        """
 467        Decodes body based on the current Content-Encoding header, then
 468        removes the header.
 469
 470        If the message body is missing or empty, no action is taken.
 471
 472        *Raises:*
 473         - `ValueError`, when the content-encoding is invalid and strict is True.
 474        """
 475        if not self.raw_content:
 476            # The body is missing (for example, because of body streaming or because it's a response
 477            # to a HEAD request), so we can't correctly update content-length.
 478            return
 479        decoded = self.get_content(strict)
 480        self.headers.pop("content-encoding", None)
 481        self.content = decoded
 482
 483    def encode(self, encoding: str) -> None:
 484        """
 485        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
 486        Any existing content-encodings are overwritten, the content is not decoded beforehand.
 487
 488        *Raises:*
 489         - `ValueError`, when the specified content-encoding is invalid.
 490        """
 491        self.headers["content-encoding"] = encoding
 492        self.content = self.raw_content
 493        if "content-encoding" not in self.headers:
 494            raise ValueError(f"Invalid content encoding {repr(encoding)}")
 495
 496    def json(self, **kwargs: Any) -> Any:
 497        """
 498        Returns the JSON encoded content of the response, if any.
 499        `**kwargs` are optional arguments that will be
 500        passed to `json.loads()`.
 501
 502        Will raise if the content can not be decoded and then parsed as JSON.
 503
 504        *Raises:*
 505         - `json.decoder.JSONDecodeError` if content is not valid JSON.
 506         - `TypeError` if the content is not available, for example because the response
 507            has been streamed.
 508        """
 509        content = self.get_content(strict=False)
 510        if content is None:
 511            raise TypeError("Message content is not available.")
 512        else:
 513            return json.loads(content, **kwargs)
 514
 515
 516class Request(Message):
 517    """
 518    An HTTP request.
 519    """
 520
 521    data: RequestData
 522
 523    def __init__(
 524        self,
 525        host: str,
 526        port: int,
 527        method: bytes,
 528        scheme: bytes,
 529        authority: bytes,
 530        path: bytes,
 531        http_version: bytes,
 532        headers: Headers | tuple[tuple[bytes, bytes], ...],
 533        content: bytes | None,
 534        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 535        timestamp_start: float,
 536        timestamp_end: float | None,
 537    ):
 538        # auto-convert invalid types to retain compatibility with older code.
 539        if isinstance(host, bytes):
 540            host = host.decode("idna", "strict")
 541        if isinstance(method, str):
 542            method = method.encode("ascii", "strict")
 543        if isinstance(scheme, str):
 544            scheme = scheme.encode("ascii", "strict")
 545        if isinstance(authority, str):
 546            authority = authority.encode("ascii", "strict")
 547        if isinstance(path, str):
 548            path = path.encode("ascii", "strict")
 549        if isinstance(http_version, str):
 550            http_version = http_version.encode("ascii", "strict")
 551
 552        if isinstance(content, str):
 553            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 554        if not isinstance(headers, Headers):
 555            headers = Headers(headers)
 556        if trailers is not None and not isinstance(trailers, Headers):
 557            trailers = Headers(trailers)
 558
 559        self.data = RequestData(
 560            host=host,
 561            port=port,
 562            method=method,
 563            scheme=scheme,
 564            authority=authority,
 565            path=path,
 566            http_version=http_version,
 567            headers=headers,
 568            content=content,
 569            trailers=trailers,
 570            timestamp_start=timestamp_start,
 571            timestamp_end=timestamp_end,
 572        )
 573
 574    def __repr__(self) -> str:
 575        if self.host and self.port:
 576            hostport = f"{self.host}:{self.port}"
 577        else:
 578            hostport = ""
 579        path = self.path or ""
 580        return f"Request({self.method} {hostport}{path})"
 581
 582    @classmethod
 583    def make(
 584        cls,
 585        method: str,
 586        url: str,
 587        content: bytes | str = "",
 588        headers: (
 589            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 590        ) = (),
 591    ) -> "Request":
 592        """
 593        Simplified API for creating request objects.
 594        """
 595        # Headers can be list or dict, we differentiate here.
 596        if isinstance(headers, Headers):
 597            pass
 598        elif isinstance(headers, dict):
 599            headers = Headers(
 600                (
 601                    always_bytes(k, "utf-8", "surrogateescape"),
 602                    always_bytes(v, "utf-8", "surrogateescape"),
 603                )
 604                for k, v in headers.items()
 605            )
 606        elif isinstance(headers, Iterable):
 607            headers = Headers(headers)  # type: ignore
 608        else:
 609            raise TypeError(
 610                "Expected headers to be an iterable or dict, but is {}.".format(
 611                    type(headers).__name__
 612                )
 613            )
 614
 615        req = cls(
 616            "",
 617            0,
 618            method.encode("utf-8", "surrogateescape"),
 619            b"",
 620            b"",
 621            b"",
 622            b"HTTP/1.1",
 623            headers,
 624            b"",
 625            None,
 626            time.time(),
 627            time.time(),
 628        )
 629
 630        req.url = url
 631        # Assign this manually to update the content-length header.
 632        if isinstance(content, bytes):
 633            req.content = content
 634        elif isinstance(content, str):
 635            req.text = content
 636        else:
 637            raise TypeError(
 638                f"Expected content to be str or bytes, but is {type(content).__name__}."
 639            )
 640
 641        return req
 642
 643    @property
 644    def first_line_format(self) -> str:
 645        """
 646        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 647
 648        origin-form and asterisk-form are subsumed as "relative".
 649        """
 650        if self.method == "CONNECT":
 651            return "authority"
 652        elif self.authority:
 653            return "absolute"
 654        else:
 655            return "relative"
 656
 657    @property
 658    def method(self) -> str:
 659        """
 660        HTTP request method, e.g. "GET".
 661        """
 662        return self.data.method.decode("utf-8", "surrogateescape").upper()
 663
 664    @method.setter
 665    def method(self, val: str | bytes) -> None:
 666        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 667
 668    @property
 669    def scheme(self) -> str:
 670        """
 671        HTTP request scheme, which should be "http" or "https".
 672        """
 673        return self.data.scheme.decode("utf-8", "surrogateescape")
 674
 675    @scheme.setter
 676    def scheme(self, val: str | bytes) -> None:
 677        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 678
 679    @property
 680    def authority(self) -> str:
 681        """
 682        HTTP request authority.
 683
 684        For HTTP/1, this is the authority portion of the request target
 685        (in either absolute-form or authority-form).
 686        For origin-form and asterisk-form requests, this property is set to an empty string.
 687
 688        For HTTP/2, this is the :authority pseudo header.
 689
 690        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 691        """
 692        try:
 693            return self.data.authority.decode("idna")
 694        except UnicodeError:
 695            return self.data.authority.decode("utf8", "surrogateescape")
 696
 697    @authority.setter
 698    def authority(self, val: str | bytes) -> None:
 699        if isinstance(val, str):
 700            try:
 701                val = val.encode("idna", "strict")
 702            except UnicodeError:
 703                val = val.encode("utf8", "surrogateescape")  # type: ignore
 704        self.data.authority = val
 705
 706    @property
 707    def host(self) -> str:
 708        """
 709        Target server for this request. This may be parsed from the raw request
 710        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 711        or inferred from the proxy mode (e.g. an IP in transparent mode).
 712
 713        Setting the host attribute also updates the host header and authority information, if present.
 714
 715        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 716        """
 717        return self.data.host
 718
 719    @host.setter
 720    def host(self, val: str | bytes) -> None:
 721        self.data.host = always_str(val, "idna", "strict")
 722        self._update_host_and_authority()
 723
 724    @property
 725    def host_header(self) -> str | None:
 726        """
 727        The request's host/authority header.
 728
 729        This property maps to either ``request.headers["Host"]`` or
 730        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 731
 732        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 733        """
 734        if self.is_http2 or self.is_http3:
 735            return self.authority or self.data.headers.get("Host", None)
 736        else:
 737            return self.data.headers.get("Host", None)
 738
 739    @host_header.setter
 740    def host_header(self, val: None | str | bytes) -> None:
 741        if val is None:
 742            if self.is_http2 or self.is_http3:
 743                self.data.authority = b""
 744            self.headers.pop("Host", None)
 745        else:
 746            if self.is_http2 or self.is_http3:
 747                self.authority = val  # type: ignore
 748            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 749                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 750                self.headers["Host"] = val
 751
 752    @property
 753    def port(self) -> int:
 754        """
 755        Target port.
 756        """
 757        return self.data.port
 758
 759    @port.setter
 760    def port(self, port: int) -> None:
 761        if not isinstance(port, int):
 762            raise ValueError(f"Port must be an integer, not {port!r}.")
 763
 764        self.data.port = port
 765        self._update_host_and_authority()
 766
 767    def _update_host_and_authority(self) -> None:
 768        val = url.hostport(self.scheme, self.host, self.port)
 769
 770        # Update host header
 771        if "Host" in self.data.headers:
 772            self.data.headers["Host"] = val
 773        # Update authority
 774        if self.data.authority:
 775            self.authority = val
 776
 777    @property
 778    def path(self) -> str:
 779        """
 780        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 781        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 782
 783        This attribute includes both path and query parts of the target URI
 784        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 785        """
 786        return self.data.path.decode("utf-8", "surrogateescape")
 787
 788    @path.setter
 789    def path(self, val: str | bytes) -> None:
 790        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 791
 792    @property
 793    def url(self) -> str:
 794        """
 795        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 796
 797        Settings this property updates these attributes as well.
 798        """
 799        if self.first_line_format == "authority":
 800            return f"{self.host}:{self.port}"
 801        path = self.path if self.path != "*" else ""
 802        return url.unparse(self.scheme, self.host, self.port, path)
 803
 804    @url.setter
 805    def url(self, val: str | bytes) -> None:
 806        val = always_str(val, "utf-8", "surrogateescape")
 807        self.scheme, self.host, self.port, self.path = url.parse(val)  # type: ignore
 808
 809    @property
 810    def pretty_host(self) -> str:
 811        """
 812        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 813        This is useful in transparent mode where `Request.host` is only an IP address.
 814
 815        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 816        as the Host header could be spoofed.
 817        """
 818        authority = self.host_header
 819        if authority:
 820            return url.parse_authority(authority, check=False)[0]
 821        else:
 822            return self.host
 823
 824    @property
 825    def pretty_url(self) -> str:
 826        """
 827        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 828        """
 829        if self.first_line_format == "authority":
 830            return self.authority
 831
 832        host_header = self.host_header
 833        if not host_header:
 834            return self.url
 835
 836        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 837        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 838        path = self.path if self.path != "*" else ""
 839
 840        return url.unparse(self.scheme, pretty_host, pretty_port, path)
 841
 842    def _get_query(self):
 843        query = urllib.parse.urlparse(self.url).query
 844        return tuple(url.decode(query))
 845
 846    def _set_query(self, query_data):
 847        query = url.encode(query_data)
 848        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 849        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 850
 851    @property
 852    def query(self) -> multidict.MultiDictView[str, str]:
 853        """
 854        The request query as a mutable mapping view on the request's path.
 855        For the most part, this behaves like a dictionary.
 856        Modifications to the MultiDictView update `Request.path`, and vice versa.
 857        """
 858        return multidict.MultiDictView(self._get_query, self._set_query)
 859
 860    @query.setter
 861    def query(self, value):
 862        self._set_query(value)
 863
 864    def _get_cookies(self):
 865        h = self.headers.get_all("Cookie")
 866        return tuple(cookies.parse_cookie_headers(h))
 867
 868    def _set_cookies(self, value):
 869        self.headers["cookie"] = cookies.format_cookie_header(value)
 870
 871    @property
 872    def cookies(self) -> multidict.MultiDictView[str, str]:
 873        """
 874        The request cookies.
 875        For the most part, this behaves like a dictionary.
 876        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 877        """
 878        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 879
 880    @cookies.setter
 881    def cookies(self, value):
 882        self._set_cookies(value)
 883
 884    @property
 885    def path_components(self) -> tuple[str, ...]:
 886        """
 887        The URL's path components as a tuple of strings.
 888        Components are unquoted.
 889        """
 890        path = urllib.parse.urlparse(self.url).path
 891        # This needs to be a tuple so that it's immutable.
 892        # Otherwise, this would fail silently:
 893        #   request.path_components.append("foo")
 894        return tuple(url.unquote(i) for i in path.split("/") if i)
 895
 896    @path_components.setter
 897    def path_components(self, components: Iterable[str]):
 898        components = map(lambda x: url.quote(x, safe=""), components)
 899        path = "/" + "/".join(components)
 900        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 901        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 902
 903    def anticache(self) -> None:
 904        """
 905        Modifies this request to remove headers that might produce a cached response.
 906        """
 907        delheaders = (
 908            "if-modified-since",
 909            "if-none-match",
 910        )
 911        for i in delheaders:
 912            self.headers.pop(i, None)
 913
 914    def anticomp(self) -> None:
 915        """
 916        Modify the Accept-Encoding header to only accept uncompressed responses.
 917        """
 918        self.headers["accept-encoding"] = "identity"
 919
 920    def constrain_encoding(self) -> None:
 921        """
 922        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 923        """
 924        accept_encoding = self.headers.get("accept-encoding")
 925        if accept_encoding:
 926            self.headers["accept-encoding"] = ", ".join(
 927                e
 928                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 929                if e in accept_encoding
 930            )
 931
 932    def _get_urlencoded_form(self):
 933        is_valid_content_type = (
 934            "application/x-www-form-urlencoded"
 935            in self.headers.get("content-type", "").lower()
 936        )
 937        if is_valid_content_type:
 938            return tuple(url.decode(self.get_text(strict=False)))
 939        return ()
 940
 941    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 942        """
 943        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 944        This will overwrite the existing content if there is one.
 945        """
 946        self.headers["content-type"] = "application/x-www-form-urlencoded"
 947        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 948
 949    @property
 950    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 951        """
 952        The URL-encoded form data.
 953
 954        If the content-type indicates non-form data or the form could not be parsed, this is set to
 955        an empty `MultiDictView`.
 956
 957        Modifications to the MultiDictView update `Request.content`, and vice versa.
 958        """
 959        return multidict.MultiDictView(
 960            self._get_urlencoded_form, self._set_urlencoded_form
 961        )
 962
 963    @urlencoded_form.setter
 964    def urlencoded_form(self, value):
 965        self._set_urlencoded_form(value)
 966
 967    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 968        is_valid_content_type = (
 969            "multipart/form-data" in self.headers.get("content-type", "").lower()
 970        )
 971        if is_valid_content_type and self.content is not None:
 972            try:
 973                return multipart.decode_multipart(
 974                    self.headers.get("content-type"), self.content
 975                )
 976            except ValueError:
 977                pass
 978        return []
 979
 980    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 981        ct = self.headers.get("content-type", "")
 982        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 983        if not is_valid_content_type:
 984            """
 985            Generate a random boundary here.
 986
 987            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 988            on generating the boundary.
 989            """
 990            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 991            self.headers["content-type"] = ct = (
 992                f"multipart/form-data; boundary={boundary}"
 993            )
 994        self.content = multipart.encode_multipart(ct, value)
 995
 996    @property
 997    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 998        """
 999        The multipart form data.
1000
1001        If the content-type indicates non-form data or the form could not be parsed, this is set to
1002        an empty `MultiDictView`.
1003
1004        Modifications to the MultiDictView update `Request.content`, and vice versa.
1005        """
1006        return multidict.MultiDictView(
1007            self._get_multipart_form, self._set_multipart_form
1008        )
1009
1010    @multipart_form.setter
1011    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1012        self._set_multipart_form(value)
1013
1014
1015class Response(Message):
1016    """
1017    An HTTP response.
1018    """
1019
1020    data: ResponseData
1021
1022    def __init__(
1023        self,
1024        http_version: bytes,
1025        status_code: int,
1026        reason: bytes,
1027        headers: Headers | tuple[tuple[bytes, bytes], ...],
1028        content: bytes | None,
1029        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1030        timestamp_start: float,
1031        timestamp_end: float | None,
1032    ):
1033        # auto-convert invalid types to retain compatibility with older code.
1034        if isinstance(http_version, str):
1035            http_version = http_version.encode("ascii", "strict")
1036        if isinstance(reason, str):
1037            reason = reason.encode("ascii", "strict")
1038
1039        if isinstance(content, str):
1040            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1041        if not isinstance(headers, Headers):
1042            headers = Headers(headers)
1043        if trailers is not None and not isinstance(trailers, Headers):
1044            trailers = Headers(trailers)
1045
1046        self.data = ResponseData(
1047            http_version=http_version,
1048            status_code=status_code,
1049            reason=reason,
1050            headers=headers,
1051            content=content,
1052            trailers=trailers,
1053            timestamp_start=timestamp_start,
1054            timestamp_end=timestamp_end,
1055        )
1056
1057    def __repr__(self) -> str:
1058        if self.raw_content:
1059            ct = self.headers.get("content-type", "unknown content type")
1060            size = human.pretty_size(len(self.raw_content))
1061            details = f"{ct}, {size}"
1062        else:
1063            details = "no content"
1064        return f"Response({self.status_code}, {details})"
1065
1066    @classmethod
1067    def make(
1068        cls,
1069        status_code: int = 200,
1070        content: bytes | str = b"",
1071        headers: (
1072            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1073        ) = (),
1074    ) -> "Response":
1075        """
1076        Simplified API for creating response objects.
1077        """
1078        if isinstance(headers, Headers):
1079            headers = headers
1080        elif isinstance(headers, dict):
1081            headers = Headers(
1082                (
1083                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1084                    always_bytes(v, "utf-8", "surrogateescape"),
1085                )
1086                for k, v in headers.items()
1087            )
1088        elif isinstance(headers, Iterable):
1089            headers = Headers(headers)  # type: ignore
1090        else:
1091            raise TypeError(
1092                "Expected headers to be an iterable or dict, but is {}.".format(
1093                    type(headers).__name__
1094                )
1095            )
1096
1097        resp = cls(
1098            b"HTTP/1.1",
1099            status_code,
1100            status_codes.RESPONSES.get(status_code, "").encode(),
1101            headers,
1102            None,
1103            None,
1104            time.time(),
1105            time.time(),
1106        )
1107
1108        # Assign this manually to update the content-length header.
1109        if isinstance(content, bytes):
1110            resp.content = content
1111        elif isinstance(content, str):
1112            resp.text = content
1113        else:
1114            raise TypeError(
1115                f"Expected content to be str or bytes, but is {type(content).__name__}."
1116            )
1117
1118        return resp
1119
1120    @property
1121    def status_code(self) -> int:
1122        """
1123        HTTP Status Code, e.g. ``200``.
1124        """
1125        return self.data.status_code
1126
1127    @status_code.setter
1128    def status_code(self, status_code: int) -> None:
1129        self.data.status_code = status_code
1130
1131    @property
1132    def reason(self) -> str:
1133        """
1134        HTTP reason phrase, for example "Not Found".
1135
1136        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1137        """
1138        # Encoding: http://stackoverflow.com/a/16674906/934719
1139        return self.data.reason.decode("ISO-8859-1")
1140
1141    @reason.setter
1142    def reason(self, reason: str | bytes) -> None:
1143        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1144
1145    def _get_cookies(self):
1146        h = self.headers.get_all("set-cookie")
1147        all_cookies = cookies.parse_set_cookie_headers(h)
1148        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1149
1150    def _set_cookies(self, value):
1151        cookie_headers = []
1152        for k, v in value:
1153            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1154            cookie_headers.append(header)
1155        self.headers.set_all("set-cookie", cookie_headers)
1156
1157    @property
1158    def cookies(
1159        self,
1160    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1161        """
1162        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1163        name strings, and values are `(cookie value, attributes)` tuples. Within
1164        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1165        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1166
1167        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1168        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1169        """
1170        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1171
1172    @cookies.setter
1173    def cookies(self, value):
1174        self._set_cookies(value)
1175
1176    def refresh(self, now=None):
1177        """
1178        This fairly complex and heuristic function refreshes a server
1179        response for replay.
1180
1181         - It adjusts date, expires, and last-modified headers.
1182         - It adjusts cookie expiration.
1183        """
1184        if not now:
1185            now = time.time()
1186        delta = now - self.timestamp_start
1187        refresh_headers = [
1188            "date",
1189            "expires",
1190            "last-modified",
1191        ]
1192        for i in refresh_headers:
1193            if i in self.headers:
1194                d = parsedate_tz(self.headers[i])
1195                if d:
1196                    new = mktime_tz(d) + delta
1197                    try:
1198                        self.headers[i] = formatdate(new, usegmt=True)
1199                    except OSError:  # pragma: no cover
1200                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1201        c = []
1202        for set_cookie_header in self.headers.get_all("set-cookie"):
1203            try:
1204                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1205            except ValueError:
1206                refreshed = set_cookie_header
1207            c.append(refreshed)
1208        if c:
1209            self.headers.set_all("set-cookie", c)
1210
1211
1212class HTTPFlow(flow.Flow):
1213    """
1214    An HTTPFlow is a collection of objects representing a single HTTP
1215    transaction.
1216    """
1217
1218    request: Request
1219    """The client's HTTP request."""
1220    response: Response | None = None
1221    """The server's HTTP response."""
1222    error: flow.Error | None = None
1223    """
1224    A connection or protocol error affecting this flow.
1225
1226    Note that it's possible for a Flow to have both a response and an error
1227    object. This might happen, for instance, when a response was received
1228    from the server, but there was an error sending it back to the client.
1229    """
1230
1231    websocket: WebSocketData | None = None
1232    """
1233    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1234    """
1235
1236    def get_state(self) -> serializable.State:
1237        return {
1238            **super().get_state(),
1239            "request": self.request.get_state(),
1240            "response": self.response.get_state() if self.response else None,
1241            "websocket": self.websocket.get_state() if self.websocket else None,
1242        }
1243
1244    def set_state(self, state: serializable.State) -> None:
1245        self.request = Request.from_state(state.pop("request"))
1246        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1247        self.websocket = (
1248            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1249        )
1250        super().set_state(state)
1251
1252    def __repr__(self):
1253        s = "<HTTPFlow"
1254        for a in (
1255            "request",
1256            "response",
1257            "websocket",
1258            "error",
1259            "client_conn",
1260            "server_conn",
1261        ):
1262            if getattr(self, a, False):
1263                s += f"\r\n  {a} = {{flow.{a}}}"
1264        s += ">"
1265        return s.format(flow=self)
1266
1267    @property
1268    def timestamp_start(self) -> float:
1269        """*Read-only:* An alias for `Request.timestamp_start`."""
1270        return self.request.timestamp_start
1271
1272    @property
1273    def mode(self) -> str:  # pragma: no cover
1274        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1275        return getattr(self, "_mode", "regular")
1276
1277    @mode.setter
1278    def mode(self, val: str) -> None:  # pragma: no cover
1279        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1280        self._mode = val
1281
1282    def copy(self):
1283        f = super().copy()
1284        if self.request:
1285            f.request = self.request.copy()
1286        if self.response:
1287            f.response = self.response.copy()
1288        return f
1289
1290
1291__all__ = [
1292    "HTTPFlow",
1293    "Message",
1294    "Request",
1295    "Response",
1296    "Headers",
1297]
class HTTPFlow(mitmproxy.flow.Flow):
1213class HTTPFlow(flow.Flow):
1214    """
1215    An HTTPFlow is a collection of objects representing a single HTTP
1216    transaction.
1217    """
1218
1219    request: Request
1220    """The client's HTTP request."""
1221    response: Response | None = None
1222    """The server's HTTP response."""
1223    error: flow.Error | None = None
1224    """
1225    A connection or protocol error affecting this flow.
1226
1227    Note that it's possible for a Flow to have both a response and an error
1228    object. This might happen, for instance, when a response was received
1229    from the server, but there was an error sending it back to the client.
1230    """
1231
1232    websocket: WebSocketData | None = None
1233    """
1234    If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
1235    """
1236
1237    def get_state(self) -> serializable.State:
1238        return {
1239            **super().get_state(),
1240            "request": self.request.get_state(),
1241            "response": self.response.get_state() if self.response else None,
1242            "websocket": self.websocket.get_state() if self.websocket else None,
1243        }
1244
1245    def set_state(self, state: serializable.State) -> None:
1246        self.request = Request.from_state(state.pop("request"))
1247        self.response = Response.from_state(r) if (r := state.pop("response")) else None
1248        self.websocket = (
1249            WebSocketData.from_state(w) if (w := state.pop("websocket")) else None
1250        )
1251        super().set_state(state)
1252
1253    def __repr__(self):
1254        s = "<HTTPFlow"
1255        for a in (
1256            "request",
1257            "response",
1258            "websocket",
1259            "error",
1260            "client_conn",
1261            "server_conn",
1262        ):
1263            if getattr(self, a, False):
1264                s += f"\r\n  {a} = {{flow.{a}}}"
1265        s += ">"
1266        return s.format(flow=self)
1267
1268    @property
1269    def timestamp_start(self) -> float:
1270        """*Read-only:* An alias for `Request.timestamp_start`."""
1271        return self.request.timestamp_start
1272
1273    @property
1274    def mode(self) -> str:  # pragma: no cover
1275        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1276        return getattr(self, "_mode", "regular")
1277
1278    @mode.setter
1279    def mode(self, val: str) -> None:  # pragma: no cover
1280        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1281        self._mode = val
1282
1283    def copy(self):
1284        f = super().copy()
1285        if self.request:
1286            f.request = self.request.copy()
1287        if self.response:
1288            f.response = self.response.copy()
1289        return f

An HTTPFlow is a collection of objects representing a single HTTP transaction.

request: Request

The client's HTTP request.

response: Response | None = None

The server's HTTP response.

error: mitmproxy.flow.Error | None = None

A connection or protocol error affecting this flow.

Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client.

websocket: mitmproxy.websocket.WebSocketData | None = None

If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.

timestamp_start: float
1268    @property
1269    def timestamp_start(self) -> float:
1270        """*Read-only:* An alias for `Request.timestamp_start`."""
1271        return self.request.timestamp_start

Read-only: An alias for Request.timestamp_start.

mode: str
1273    @property
1274    def mode(self) -> str:  # pragma: no cover
1275        warnings.warn("HTTPFlow.mode is deprecated.", DeprecationWarning, stacklevel=2)
1276        return getattr(self, "_mode", "regular")
def copy(self):
1283    def copy(self):
1284        f = super().copy()
1285        if self.request:
1286            f.request = self.request.copy()
1287        if self.response:
1288            f.response = self.response.copy()
1289        return f

Make a copy of this flow.

type: ClassVar[str] = 'http'

The flow type, for example http, tcp, or dns.

class Message(mitmproxy.coretypes.serializable.Serializable):
234class Message(serializable.Serializable):
235    """Base class for `Request` and `Response`."""
236
237    @classmethod
238    def from_state(cls, state):
239        return cls(**state)
240
241    def get_state(self):
242        return self.data.get_state()
243
244    def set_state(self, state):
245        self.data.set_state(state)
246
247    data: MessageData
248    stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False
249    """
250    This attribute controls if the message body should be streamed.
251
252    If `False`, mitmproxy will buffer the entire body before forwarding it to the destination.
253    This makes it possible to perform string replacements on the entire body.
254    If `True`, the message body will not be buffered on the proxy
255    but immediately forwarded instead.
256    Alternatively, a transformation function can be specified, which will be called for each chunk of data.
257    Please note that packet boundaries generally should not be relied upon.
258
259    This attribute must be set in the `requestheaders` or `responseheaders` hook.
260    Setting it in `request` or  `response` is already too late, mitmproxy has buffered the message body already.
261    """
262
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")
269
270    @http_version.setter
271    def http_version(self, http_version: str | bytes) -> None:
272        self.data.http_version = strutils.always_bytes(
273            http_version, "utf-8", "surrogateescape"
274        )
275
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
279
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
283
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
287
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
291
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers
298
299    @headers.setter
300    def headers(self, h: Headers) -> None:
301        self.data.headers = h
302
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
309
310    @trailers.setter
311    def trailers(self, h: Headers | None) -> None:
312        self.data.trailers = h
313
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320        `raw_content` may be `None` if the content is missing, for example due to body streaming
321        (see `Message.stream`). In contrast, `b""` signals a present but empty message body.
322
323        *See also:* `Message.content`, `Message.text`
324        """
325        return self.data.content
326
327    @raw_content.setter
328    def raw_content(self, content: bytes | None) -> None:
329        self.data.content = content
330
331    @property
332    def content(self) -> bytes | None:
333        """
334        The uncompressed HTTP message body as bytes.
335
336        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
337
338        *See also:* `Message.raw_content`, `Message.text`
339        """
340        return self.get_content()
341
342    @content.setter
343    def content(self, value: bytes | None) -> None:
344        self.set_content(value)
345
346    @property
347    def text(self) -> str | None:
348        """
349        The uncompressed and decoded HTTP message body as text.
350
351        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
352
353        *See also:* `Message.raw_content`, `Message.content`
354        """
355        return self.get_text()
356
357    @text.setter
358    def text(self, value: str | None) -> None:
359        self.set_text(value)
360
361    def set_content(self, value: bytes | None) -> None:
362        if value is None:
363            self.raw_content = None
364            return
365        if not isinstance(value, bytes):
366            raise TypeError(
367                f"Message content must be bytes, not {type(value).__name__}. "
368                "Please use .text if you want to assign a str."
369            )
370        ce = self.headers.get("content-encoding")
371        try:
372            self.raw_content = encoding.encode(value, ce or "identity")
373        except ValueError:
374            # So we have an invalid content-encoding?
375            # Let's remove it!
376            del self.headers["content-encoding"]
377            self.raw_content = value
378
379        if "transfer-encoding" in self.headers:
380            # https://httpwg.org/specs/rfc7230.html#header.content-length
381            # don't set content-length if a transfer-encoding is provided
382            pass
383        else:
384            self.headers["content-length"] = str(len(self.raw_content))
385
386    def get_content(self, strict: bool = True) -> bytes | None:
387        """
388        Similar to `Message.content`, but does not raise if `strict` is `False`.
389        Instead, the compressed message body is returned as-is.
390        """
391        if self.raw_content is None:
392            return None
393        ce = self.headers.get("content-encoding")
394        if ce:
395            try:
396                content = encoding.decode(self.raw_content, ce)
397                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
398                if isinstance(content, str):
399                    raise ValueError(f"Invalid Content-Encoding: {ce}")
400                return content
401            except ValueError:
402                if strict:
403                    raise
404                return self.raw_content
405        else:
406            return self.raw_content
407
408    def set_text(self, text: str | None) -> None:
409        if text is None:
410            self.content = None
411            return
412        enc = infer_content_encoding(self.headers.get("content-type", ""))
413
414        try:
415            self.content = cast(bytes, encoding.encode(text, enc))
416        except ValueError:
417            # Fall back to UTF-8 and update the content-type header.
418            ct = parse_content_type(self.headers.get("content-type", "")) or (
419                "text",
420                "plain",
421                {},
422            )
423            ct[2]["charset"] = "utf-8"
424            self.headers["content-type"] = assemble_content_type(*ct)
425            enc = "utf8"
426            self.content = text.encode(enc, "surrogateescape")
427
428    def get_text(self, strict: bool = True) -> str | None:
429        """
430        Similar to `Message.text`, but does not raise if `strict` is `False`.
431        Instead, the message body is returned as surrogate-escaped UTF-8.
432        """
433        content = self.get_content(strict)
434        if content is None:
435            return None
436        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
437        try:
438            return cast(str, encoding.decode(content, enc))
439        except ValueError:
440            if strict:
441                raise
442            return content.decode("utf8", "surrogateescape")
443
444    @property
445    def timestamp_start(self) -> float:
446        """
447        *Timestamp:* Headers received.
448        """
449        return self.data.timestamp_start
450
451    @timestamp_start.setter
452    def timestamp_start(self, timestamp_start: float) -> None:
453        self.data.timestamp_start = timestamp_start
454
455    @property
456    def timestamp_end(self) -> float | None:
457        """
458        *Timestamp:* Last byte received.
459        """
460        return self.data.timestamp_end
461
462    @timestamp_end.setter
463    def timestamp_end(self, timestamp_end: float | None):
464        self.data.timestamp_end = timestamp_end
465
466    def decode(self, strict: bool = True) -> None:
467        """
468        Decodes body based on the current Content-Encoding header, then
469        removes the header.
470
471        If the message body is missing or empty, no action is taken.
472
473        *Raises:*
474         - `ValueError`, when the content-encoding is invalid and strict is True.
475        """
476        if not self.raw_content:
477            # The body is missing (for example, because of body streaming or because it's a response
478            # to a HEAD request), so we can't correctly update content-length.
479            return
480        decoded = self.get_content(strict)
481        self.headers.pop("content-encoding", None)
482        self.content = decoded
483
484    def encode(self, encoding: str) -> None:
485        """
486        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
487        Any existing content-encodings are overwritten, the content is not decoded beforehand.
488
489        *Raises:*
490         - `ValueError`, when the specified content-encoding is invalid.
491        """
492        self.headers["content-encoding"] = encoding
493        self.content = self.raw_content
494        if "content-encoding" not in self.headers:
495            raise ValueError(f"Invalid content encoding {repr(encoding)}")
496
497    def json(self, **kwargs: Any) -> Any:
498        """
499        Returns the JSON encoded content of the response, if any.
500        `**kwargs` are optional arguments that will be
501        passed to `json.loads()`.
502
503        Will raise if the content can not be decoded and then parsed as JSON.
504
505        *Raises:*
506         - `json.decoder.JSONDecodeError` if content is not valid JSON.
507         - `TypeError` if the content is not available, for example because the response
508            has been streamed.
509        """
510        content = self.get_content(strict=False)
511        if content is None:
512            raise TypeError("Message content is not available.")
513        else:
514            return json.loads(content, **kwargs)

Base class for Request and Response.

stream: Callable[[bytes], Iterable[bytes] | bytes] | bool = False

This attribute controls if the message body should be streamed.

If False, mitmproxy will buffer the entire body before forwarding it to the destination. This makes it possible to perform string replacements on the entire body. If True, the message body will not be buffered on the proxy but immediately forwarded instead. Alternatively, a transformation function can be specified, which will be called for each chunk of data. Please note that packet boundaries generally should not be relied upon.

This attribute must be set in the requestheaders or responseheaders hook. Setting it in request or response is already too late, mitmproxy has buffered the message body already.

http_version: str
263    @property
264    def http_version(self) -> str:
265        """
266        HTTP version string, for example `HTTP/1.1`.
267        """
268        return self.data.http_version.decode("utf-8", "surrogateescape")

HTTP version string, for example HTTP/1.1.

is_http10: bool
276    @property
277    def is_http10(self) -> bool:
278        return self.data.http_version == b"HTTP/1.0"
is_http11: bool
280    @property
281    def is_http11(self) -> bool:
282        return self.data.http_version == b"HTTP/1.1"
is_http2: bool
284    @property
285    def is_http2(self) -> bool:
286        return self.data.http_version == b"HTTP/2.0"
is_http3: bool
288    @property
289    def is_http3(self) -> bool:
290        return self.data.http_version == b"HTTP/3"
headers: Headers
292    @property
293    def headers(self) -> Headers:
294        """
295        The HTTP headers.
296        """
297        return self.data.headers

The HTTP headers.

trailers: Headers | None
303    @property
304    def trailers(self) -> Headers | None:
305        """
306        The [HTTP trailers](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer).
307        """
308        return self.data.trailers
raw_content: bytes | None
314    @property
315    def raw_content(self) -> bytes | None:
316        """
317        The raw (potentially compressed) HTTP message body.
318
319        In contrast to `Message.content` and `Message.text`, accessing this property never raises.
320        `raw_content` may be `None` if the content is missing, for example due to body streaming
321        (see `Message.stream`). In contrast, `b""` signals a present but empty message body.
322
323        *See also:* `Message.content`, `Message.text`
324        """
325        return self.data.content

The raw (potentially compressed) HTTP message body.

In contrast to Message.content and Message.text, accessing this property never raises. raw_content may be None if the content is missing, for example due to body streaming (see Message.stream). In contrast, b"" signals a present but empty message body.

See also: Message.content, Message.text

content: bytes | None
331    @property
332    def content(self) -> bytes | None:
333        """
334        The uncompressed HTTP message body as bytes.
335
336        Accessing this attribute may raise a `ValueError` when the HTTP content-encoding is invalid.
337
338        *See also:* `Message.raw_content`, `Message.text`
339        """
340        return self.get_content()

The uncompressed HTTP message body as bytes.

Accessing this attribute may raise a ValueError when the HTTP content-encoding is invalid.

See also: Message.raw_content, Message.text

text: str | None
346    @property
347    def text(self) -> str | None:
348        """
349        The uncompressed and decoded HTTP message body as text.
350
351        Accessing this attribute may raise a `ValueError` when either content-encoding or charset is invalid.
352
353        *See also:* `Message.raw_content`, `Message.content`
354        """
355        return self.get_text()

The uncompressed and decoded HTTP message body as text.

Accessing this attribute may raise a ValueError when either content-encoding or charset is invalid.

See also: Message.raw_content, Message.content

def set_content(self, value: bytes | None) -> None:
361    def set_content(self, value: bytes | None) -> None:
362        if value is None:
363            self.raw_content = None
364            return
365        if not isinstance(value, bytes):
366            raise TypeError(
367                f"Message content must be bytes, not {type(value).__name__}. "
368                "Please use .text if you want to assign a str."
369            )
370        ce = self.headers.get("content-encoding")
371        try:
372            self.raw_content = encoding.encode(value, ce or "identity")
373        except ValueError:
374            # So we have an invalid content-encoding?
375            # Let's remove it!
376            del self.headers["content-encoding"]
377            self.raw_content = value
378
379        if "transfer-encoding" in self.headers:
380            # https://httpwg.org/specs/rfc7230.html#header.content-length
381            # don't set content-length if a transfer-encoding is provided
382            pass
383        else:
384            self.headers["content-length"] = str(len(self.raw_content))
def get_content(self, strict: bool = True) -> bytes | None:
386    def get_content(self, strict: bool = True) -> bytes | None:
387        """
388        Similar to `Message.content`, but does not raise if `strict` is `False`.
389        Instead, the compressed message body is returned as-is.
390        """
391        if self.raw_content is None:
392            return None
393        ce = self.headers.get("content-encoding")
394        if ce:
395            try:
396                content = encoding.decode(self.raw_content, ce)
397                # A client may illegally specify a byte -> str encoding here (e.g. utf8)
398                if isinstance(content, str):
399                    raise ValueError(f"Invalid Content-Encoding: {ce}")
400                return content
401            except ValueError:
402                if strict:
403                    raise
404                return self.raw_content
405        else:
406            return self.raw_content

Similar to Message.content, but does not raise if strict is False. Instead, the compressed message body is returned as-is.

def set_text(self, text: str | None) -> None:
408    def set_text(self, text: str | None) -> None:
409        if text is None:
410            self.content = None
411            return
412        enc = infer_content_encoding(self.headers.get("content-type", ""))
413
414        try:
415            self.content = cast(bytes, encoding.encode(text, enc))
416        except ValueError:
417            # Fall back to UTF-8 and update the content-type header.
418            ct = parse_content_type(self.headers.get("content-type", "")) or (
419                "text",
420                "plain",
421                {},
422            )
423            ct[2]["charset"] = "utf-8"
424            self.headers["content-type"] = assemble_content_type(*ct)
425            enc = "utf8"
426            self.content = text.encode(enc, "surrogateescape")
def get_text(self, strict: bool = True) -> str | None:
428    def get_text(self, strict: bool = True) -> str | None:
429        """
430        Similar to `Message.text`, but does not raise if `strict` is `False`.
431        Instead, the message body is returned as surrogate-escaped UTF-8.
432        """
433        content = self.get_content(strict)
434        if content is None:
435            return None
436        enc = infer_content_encoding(self.headers.get("content-type", ""), content)
437        try:
438            return cast(str, encoding.decode(content, enc))
439        except ValueError:
440            if strict:
441                raise
442            return content.decode("utf8", "surrogateescape")

Similar to Message.text, but does not raise if strict is False. Instead, the message body is returned as surrogate-escaped UTF-8.

timestamp_start: float
444    @property
445    def timestamp_start(self) -> float:
446        """
447        *Timestamp:* Headers received.
448        """
449        return self.data.timestamp_start

Timestamp: Headers received.

timestamp_end: float | None
455    @property
456    def timestamp_end(self) -> float | None:
457        """
458        *Timestamp:* Last byte received.
459        """
460        return self.data.timestamp_end

Timestamp: Last byte received.

def decode(self, strict: bool = True) -> None:
466    def decode(self, strict: bool = True) -> None:
467        """
468        Decodes body based on the current Content-Encoding header, then
469        removes the header.
470
471        If the message body is missing or empty, no action is taken.
472
473        *Raises:*
474         - `ValueError`, when the content-encoding is invalid and strict is True.
475        """
476        if not self.raw_content:
477            # The body is missing (for example, because of body streaming or because it's a response
478            # to a HEAD request), so we can't correctly update content-length.
479            return
480        decoded = self.get_content(strict)
481        self.headers.pop("content-encoding", None)
482        self.content = decoded

Decodes body based on the current Content-Encoding header, then removes the header.

If the message body is missing or empty, no action is taken.

Raises:

  • ValueError, when the content-encoding is invalid and strict is True.
def encode(self, encoding: str) -> None:
484    def encode(self, encoding: str) -> None:
485        """
486        Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd".
487        Any existing content-encodings are overwritten, the content is not decoded beforehand.
488
489        *Raises:*
490         - `ValueError`, when the specified content-encoding is invalid.
491        """
492        self.headers["content-encoding"] = encoding
493        self.content = self.raw_content
494        if "content-encoding" not in self.headers:
495            raise ValueError(f"Invalid content encoding {repr(encoding)}")

Encodes body with the given encoding, where e is "gzip", "deflate", "identity", "br", or "zstd". Any existing content-encodings are overwritten, the content is not decoded beforehand.

Raises:

  • ValueError, when the specified content-encoding is invalid.
def json(self, **kwargs: Any) -> Any:
497    def json(self, **kwargs: Any) -> Any:
498        """
499        Returns the JSON encoded content of the response, if any.
500        `**kwargs` are optional arguments that will be
501        passed to `json.loads()`.
502
503        Will raise if the content can not be decoded and then parsed as JSON.
504
505        *Raises:*
506         - `json.decoder.JSONDecodeError` if content is not valid JSON.
507         - `TypeError` if the content is not available, for example because the response
508            has been streamed.
509        """
510        content = self.get_content(strict=False)
511        if content is None:
512            raise TypeError("Message content is not available.")
513        else:
514            return json.loads(content, **kwargs)

Returns the JSON encoded content of the response, if any. **kwargs are optional arguments that will be passed to json.loads().

Will raise if the content can not be decoded and then parsed as JSON.

Raises:

  • json.decoder.JSONDecodeError if content is not valid JSON.
  • TypeError if the content is not available, for example because the response has been streamed.
class Request(Message):
 517class Request(Message):
 518    """
 519    An HTTP request.
 520    """
 521
 522    data: RequestData
 523
 524    def __init__(
 525        self,
 526        host: str,
 527        port: int,
 528        method: bytes,
 529        scheme: bytes,
 530        authority: bytes,
 531        path: bytes,
 532        http_version: bytes,
 533        headers: Headers | tuple[tuple[bytes, bytes], ...],
 534        content: bytes | None,
 535        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
 536        timestamp_start: float,
 537        timestamp_end: float | None,
 538    ):
 539        # auto-convert invalid types to retain compatibility with older code.
 540        if isinstance(host, bytes):
 541            host = host.decode("idna", "strict")
 542        if isinstance(method, str):
 543            method = method.encode("ascii", "strict")
 544        if isinstance(scheme, str):
 545            scheme = scheme.encode("ascii", "strict")
 546        if isinstance(authority, str):
 547            authority = authority.encode("ascii", "strict")
 548        if isinstance(path, str):
 549            path = path.encode("ascii", "strict")
 550        if isinstance(http_version, str):
 551            http_version = http_version.encode("ascii", "strict")
 552
 553        if isinstance(content, str):
 554            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
 555        if not isinstance(headers, Headers):
 556            headers = Headers(headers)
 557        if trailers is not None and not isinstance(trailers, Headers):
 558            trailers = Headers(trailers)
 559
 560        self.data = RequestData(
 561            host=host,
 562            port=port,
 563            method=method,
 564            scheme=scheme,
 565            authority=authority,
 566            path=path,
 567            http_version=http_version,
 568            headers=headers,
 569            content=content,
 570            trailers=trailers,
 571            timestamp_start=timestamp_start,
 572            timestamp_end=timestamp_end,
 573        )
 574
 575    def __repr__(self) -> str:
 576        if self.host and self.port:
 577            hostport = f"{self.host}:{self.port}"
 578        else:
 579            hostport = ""
 580        path = self.path or ""
 581        return f"Request({self.method} {hostport}{path})"
 582
 583    @classmethod
 584    def make(
 585        cls,
 586        method: str,
 587        url: str,
 588        content: bytes | str = "",
 589        headers: (
 590            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
 591        ) = (),
 592    ) -> "Request":
 593        """
 594        Simplified API for creating request objects.
 595        """
 596        # Headers can be list or dict, we differentiate here.
 597        if isinstance(headers, Headers):
 598            pass
 599        elif isinstance(headers, dict):
 600            headers = Headers(
 601                (
 602                    always_bytes(k, "utf-8", "surrogateescape"),
 603                    always_bytes(v, "utf-8", "surrogateescape"),
 604                )
 605                for k, v in headers.items()
 606            )
 607        elif isinstance(headers, Iterable):
 608            headers = Headers(headers)  # type: ignore
 609        else:
 610            raise TypeError(
 611                "Expected headers to be an iterable or dict, but is {}.".format(
 612                    type(headers).__name__
 613                )
 614            )
 615
 616        req = cls(
 617            "",
 618            0,
 619            method.encode("utf-8", "surrogateescape"),
 620            b"",
 621            b"",
 622            b"",
 623            b"HTTP/1.1",
 624            headers,
 625            b"",
 626            None,
 627            time.time(),
 628            time.time(),
 629        )
 630
 631        req.url = url
 632        # Assign this manually to update the content-length header.
 633        if isinstance(content, bytes):
 634            req.content = content
 635        elif isinstance(content, str):
 636            req.text = content
 637        else:
 638            raise TypeError(
 639                f"Expected content to be str or bytes, but is {type(content).__name__}."
 640            )
 641
 642        return req
 643
 644    @property
 645    def first_line_format(self) -> str:
 646        """
 647        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
 648
 649        origin-form and asterisk-form are subsumed as "relative".
 650        """
 651        if self.method == "CONNECT":
 652            return "authority"
 653        elif self.authority:
 654            return "absolute"
 655        else:
 656            return "relative"
 657
 658    @property
 659    def method(self) -> str:
 660        """
 661        HTTP request method, e.g. "GET".
 662        """
 663        return self.data.method.decode("utf-8", "surrogateescape").upper()
 664
 665    @method.setter
 666    def method(self, val: str | bytes) -> None:
 667        self.data.method = always_bytes(val, "utf-8", "surrogateescape")
 668
 669    @property
 670    def scheme(self) -> str:
 671        """
 672        HTTP request scheme, which should be "http" or "https".
 673        """
 674        return self.data.scheme.decode("utf-8", "surrogateescape")
 675
 676    @scheme.setter
 677    def scheme(self, val: str | bytes) -> None:
 678        self.data.scheme = always_bytes(val, "utf-8", "surrogateescape")
 679
 680    @property
 681    def authority(self) -> str:
 682        """
 683        HTTP request authority.
 684
 685        For HTTP/1, this is the authority portion of the request target
 686        (in either absolute-form or authority-form).
 687        For origin-form and asterisk-form requests, this property is set to an empty string.
 688
 689        For HTTP/2, this is the :authority pseudo header.
 690
 691        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
 692        """
 693        try:
 694            return self.data.authority.decode("idna")
 695        except UnicodeError:
 696            return self.data.authority.decode("utf8", "surrogateescape")
 697
 698    @authority.setter
 699    def authority(self, val: str | bytes) -> None:
 700        if isinstance(val, str):
 701            try:
 702                val = val.encode("idna", "strict")
 703            except UnicodeError:
 704                val = val.encode("utf8", "surrogateescape")  # type: ignore
 705        self.data.authority = val
 706
 707    @property
 708    def host(self) -> str:
 709        """
 710        Target server for this request. This may be parsed from the raw request
 711        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
 712        or inferred from the proxy mode (e.g. an IP in transparent mode).
 713
 714        Setting the host attribute also updates the host header and authority information, if present.
 715
 716        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
 717        """
 718        return self.data.host
 719
 720    @host.setter
 721    def host(self, val: str | bytes) -> None:
 722        self.data.host = always_str(val, "idna", "strict")
 723        self._update_host_and_authority()
 724
 725    @property
 726    def host_header(self) -> str | None:
 727        """
 728        The request's host/authority header.
 729
 730        This property maps to either ``request.headers["Host"]`` or
 731        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
 732
 733        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
 734        """
 735        if self.is_http2 or self.is_http3:
 736            return self.authority or self.data.headers.get("Host", None)
 737        else:
 738            return self.data.headers.get("Host", None)
 739
 740    @host_header.setter
 741    def host_header(self, val: None | str | bytes) -> None:
 742        if val is None:
 743            if self.is_http2 or self.is_http3:
 744                self.data.authority = b""
 745            self.headers.pop("Host", None)
 746        else:
 747            if self.is_http2 or self.is_http3:
 748                self.authority = val  # type: ignore
 749            if not (self.is_http2 or self.is_http3) or "Host" in self.headers:
 750                # For h2, we only overwrite, but not create, as :authority is the h2 host header.
 751                self.headers["Host"] = val
 752
 753    @property
 754    def port(self) -> int:
 755        """
 756        Target port.
 757        """
 758        return self.data.port
 759
 760    @port.setter
 761    def port(self, port: int) -> None:
 762        if not isinstance(port, int):
 763            raise ValueError(f"Port must be an integer, not {port!r}.")
 764
 765        self.data.port = port
 766        self._update_host_and_authority()
 767
 768    def _update_host_and_authority(self) -> None:
 769        val = url.hostport(self.scheme, self.host, self.port)
 770
 771        # Update host header
 772        if "Host" in self.data.headers:
 773            self.data.headers["Host"] = val
 774        # Update authority
 775        if self.data.authority:
 776            self.authority = val
 777
 778    @property
 779    def path(self) -> str:
 780        """
 781        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
 782        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
 783
 784        This attribute includes both path and query parts of the target URI
 785        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
 786        """
 787        return self.data.path.decode("utf-8", "surrogateescape")
 788
 789    @path.setter
 790    def path(self, val: str | bytes) -> None:
 791        self.data.path = always_bytes(val, "utf-8", "surrogateescape")
 792
 793    @property
 794    def url(self) -> str:
 795        """
 796        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
 797
 798        Settings this property updates these attributes as well.
 799        """
 800        if self.first_line_format == "authority":
 801            return f"{self.host}:{self.port}"
 802        path = self.path if self.path != "*" else ""
 803        return url.unparse(self.scheme, self.host, self.port, path)
 804
 805    @url.setter
 806    def url(self, val: str | bytes) -> None:
 807        val = always_str(val, "utf-8", "surrogateescape")
 808        self.scheme, self.host, self.port, self.path = url.parse(val)  # type: ignore
 809
 810    @property
 811    def pretty_host(self) -> str:
 812        """
 813        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
 814        This is useful in transparent mode where `Request.host` is only an IP address.
 815
 816        *Warning:* When working in adversarial environments, this may not reflect the actual destination
 817        as the Host header could be spoofed.
 818        """
 819        authority = self.host_header
 820        if authority:
 821            return url.parse_authority(authority, check=False)[0]
 822        else:
 823            return self.host
 824
 825    @property
 826    def pretty_url(self) -> str:
 827        """
 828        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
 829        """
 830        if self.first_line_format == "authority":
 831            return self.authority
 832
 833        host_header = self.host_header
 834        if not host_header:
 835            return self.url
 836
 837        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
 838        pretty_port = pretty_port or url.default_port(self.scheme) or 443
 839        path = self.path if self.path != "*" else ""
 840
 841        return url.unparse(self.scheme, pretty_host, pretty_port, path)
 842
 843    def _get_query(self):
 844        query = urllib.parse.urlparse(self.url).query
 845        return tuple(url.decode(query))
 846
 847    def _set_query(self, query_data):
 848        query = url.encode(query_data)
 849        _, _, path, params, _, fragment = urllib.parse.urlparse(self.url)
 850        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 851
 852    @property
 853    def query(self) -> multidict.MultiDictView[str, str]:
 854        """
 855        The request query as a mutable mapping view on the request's path.
 856        For the most part, this behaves like a dictionary.
 857        Modifications to the MultiDictView update `Request.path`, and vice versa.
 858        """
 859        return multidict.MultiDictView(self._get_query, self._set_query)
 860
 861    @query.setter
 862    def query(self, value):
 863        self._set_query(value)
 864
 865    def _get_cookies(self):
 866        h = self.headers.get_all("Cookie")
 867        return tuple(cookies.parse_cookie_headers(h))
 868
 869    def _set_cookies(self, value):
 870        self.headers["cookie"] = cookies.format_cookie_header(value)
 871
 872    @property
 873    def cookies(self) -> multidict.MultiDictView[str, str]:
 874        """
 875        The request cookies.
 876        For the most part, this behaves like a dictionary.
 877        Modifications to the MultiDictView update `Request.headers`, and vice versa.
 878        """
 879        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
 880
 881    @cookies.setter
 882    def cookies(self, value):
 883        self._set_cookies(value)
 884
 885    @property
 886    def path_components(self) -> tuple[str, ...]:
 887        """
 888        The URL's path components as a tuple of strings.
 889        Components are unquoted.
 890        """
 891        path = urllib.parse.urlparse(self.url).path
 892        # This needs to be a tuple so that it's immutable.
 893        # Otherwise, this would fail silently:
 894        #   request.path_components.append("foo")
 895        return tuple(url.unquote(i) for i in path.split("/") if i)
 896
 897    @path_components.setter
 898    def path_components(self, components: Iterable[str]):
 899        components = map(lambda x: url.quote(x, safe=""), components)
 900        path = "/" + "/".join(components)
 901        _, _, _, params, query, fragment = urllib.parse.urlparse(self.url)
 902        self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment])
 903
 904    def anticache(self) -> None:
 905        """
 906        Modifies this request to remove headers that might produce a cached response.
 907        """
 908        delheaders = (
 909            "if-modified-since",
 910            "if-none-match",
 911        )
 912        for i in delheaders:
 913            self.headers.pop(i, None)
 914
 915    def anticomp(self) -> None:
 916        """
 917        Modify the Accept-Encoding header to only accept uncompressed responses.
 918        """
 919        self.headers["accept-encoding"] = "identity"
 920
 921    def constrain_encoding(self) -> None:
 922        """
 923        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
 924        """
 925        accept_encoding = self.headers.get("accept-encoding")
 926        if accept_encoding:
 927            self.headers["accept-encoding"] = ", ".join(
 928                e
 929                for e in {"gzip", "identity", "deflate", "br", "zstd"}
 930                if e in accept_encoding
 931            )
 932
 933    def _get_urlencoded_form(self):
 934        is_valid_content_type = (
 935            "application/x-www-form-urlencoded"
 936            in self.headers.get("content-type", "").lower()
 937        )
 938        if is_valid_content_type:
 939            return tuple(url.decode(self.get_text(strict=False)))
 940        return ()
 941
 942    def _set_urlencoded_form(self, form_data: Sequence[tuple[str, str]]) -> None:
 943        """
 944        Sets the body to the URL-encoded form data, and adds the appropriate content-type header.
 945        This will overwrite the existing content if there is one.
 946        """
 947        self.headers["content-type"] = "application/x-www-form-urlencoded"
 948        self.content = url.encode(form_data, self.get_text(strict=False)).encode()
 949
 950    @property
 951    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
 952        """
 953        The URL-encoded form data.
 954
 955        If the content-type indicates non-form data or the form could not be parsed, this is set to
 956        an empty `MultiDictView`.
 957
 958        Modifications to the MultiDictView update `Request.content`, and vice versa.
 959        """
 960        return multidict.MultiDictView(
 961            self._get_urlencoded_form, self._set_urlencoded_form
 962        )
 963
 964    @urlencoded_form.setter
 965    def urlencoded_form(self, value):
 966        self._set_urlencoded_form(value)
 967
 968    def _get_multipart_form(self) -> list[tuple[bytes, bytes]]:
 969        is_valid_content_type = (
 970            "multipart/form-data" in self.headers.get("content-type", "").lower()
 971        )
 972        if is_valid_content_type and self.content is not None:
 973            try:
 974                return multipart.decode_multipart(
 975                    self.headers.get("content-type"), self.content
 976                )
 977            except ValueError:
 978                pass
 979        return []
 980
 981    def _set_multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
 982        ct = self.headers.get("content-type", "")
 983        is_valid_content_type = ct.lower().startswith("multipart/form-data")
 984        if not is_valid_content_type:
 985            """
 986            Generate a random boundary here.
 987
 988            See <https://datatracker.ietf.org/doc/html/rfc2046#section-5.1.1> for specifications
 989            on generating the boundary.
 990            """
 991            boundary = "-" * 20 + binascii.hexlify(os.urandom(16)).decode()
 992            self.headers["content-type"] = ct = (
 993                f"multipart/form-data; boundary={boundary}"
 994            )
 995        self.content = multipart.encode_multipart(ct, value)
 996
 997    @property
 998    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 999        """
1000        The multipart form data.
1001
1002        If the content-type indicates non-form data or the form could not be parsed, this is set to
1003        an empty `MultiDictView`.
1004
1005        Modifications to the MultiDictView update `Request.content`, and vice versa.
1006        """
1007        return multidict.MultiDictView(
1008            self._get_multipart_form, self._set_multipart_form
1009        )
1010
1011    @multipart_form.setter
1012    def multipart_form(self, value: list[tuple[bytes, bytes]]) -> None:
1013        self._set_multipart_form(value)

An HTTP request.

Request( host: str, port: int, method: bytes, scheme: bytes, authority: bytes, path: bytes, http_version: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, timestamp_start: float, timestamp_end: float | None)
524    def __init__(
525        self,
526        host: str,
527        port: int,
528        method: bytes,
529        scheme: bytes,
530        authority: bytes,
531        path: bytes,
532        http_version: bytes,
533        headers: Headers | tuple[tuple[bytes, bytes], ...],
534        content: bytes | None,
535        trailers: Headers | tuple[tuple[bytes, bytes], ...] | None,
536        timestamp_start: float,
537        timestamp_end: float | None,
538    ):
539        # auto-convert invalid types to retain compatibility with older code.
540        if isinstance(host, bytes):
541            host = host.decode("idna", "strict")
542        if isinstance(method, str):
543            method = method.encode("ascii", "strict")
544        if isinstance(scheme, str):
545            scheme = scheme.encode("ascii", "strict")
546        if isinstance(authority, str):
547            authority = authority.encode("ascii", "strict")
548        if isinstance(path, str):
549            path = path.encode("ascii", "strict")
550        if isinstance(http_version, str):
551            http_version = http_version.encode("ascii", "strict")
552
553        if isinstance(content, str):
554            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
555        if not isinstance(headers, Headers):
556            headers = Headers(headers)
557        if trailers is not None and not isinstance(trailers, Headers):
558            trailers = Headers(trailers)
559
560        self.data = RequestData(
561            host=host,
562            port=port,
563            method=method,
564            scheme=scheme,
565            authority=authority,
566            path=path,
567            http_version=http_version,
568            headers=headers,
569            content=content,
570            trailers=trailers,
571            timestamp_start=timestamp_start,
572            timestamp_end=timestamp_end,
573        )
@classmethod
def make( cls, method: str, url: str, content: bytes | str = '', headers: Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Request:
583    @classmethod
584    def make(
585        cls,
586        method: str,
587        url: str,
588        content: bytes | str = "",
589        headers: (
590            Headers | dict[str | bytes, str | bytes] | Iterable[tuple[bytes, bytes]]
591        ) = (),
592    ) -> "Request":
593        """
594        Simplified API for creating request objects.
595        """
596        # Headers can be list or dict, we differentiate here.
597        if isinstance(headers, Headers):
598            pass
599        elif isinstance(headers, dict):
600            headers = Headers(
601                (
602                    always_bytes(k, "utf-8", "surrogateescape"),
603                    always_bytes(v, "utf-8", "surrogateescape"),
604                )
605                for k, v in headers.items()
606            )
607        elif isinstance(headers, Iterable):
608            headers = Headers(headers)  # type: ignore
609        else:
610            raise TypeError(
611                "Expected headers to be an iterable or dict, but is {}.".format(
612                    type(headers).__name__
613                )
614            )
615
616        req = cls(
617            "",
618            0,
619            method.encode("utf-8", "surrogateescape"),
620            b"",
621            b"",
622            b"",
623            b"HTTP/1.1",
624            headers,
625            b"",
626            None,
627            time.time(),
628            time.time(),
629        )
630
631        req.url = url
632        # Assign this manually to update the content-length header.
633        if isinstance(content, bytes):
634            req.content = content
635        elif isinstance(content, str):
636            req.text = content
637        else:
638            raise TypeError(
639                f"Expected content to be str or bytes, but is {type(content).__name__}."
640            )
641
642        return req

Simplified API for creating request objects.

first_line_format: str
644    @property
645    def first_line_format(self) -> str:
646        """
647        *Read-only:* HTTP request form as defined in [RFC 7230](https://tools.ietf.org/html/rfc7230#section-5.3).
648
649        origin-form and asterisk-form are subsumed as "relative".
650        """
651        if self.method == "CONNECT":
652            return "authority"
653        elif self.authority:
654            return "absolute"
655        else:
656            return "relative"

Read-only: HTTP request form as defined in RFC 7230.

origin-form and asterisk-form are subsumed as "relative".

method: str
658    @property
659    def method(self) -> str:
660        """
661        HTTP request method, e.g. "GET".
662        """
663        return self.data.method.decode("utf-8", "surrogateescape").upper()

HTTP request method, e.g. "GET".

scheme: str
669    @property
670    def scheme(self) -> str:
671        """
672        HTTP request scheme, which should be "http" or "https".
673        """
674        return self.data.scheme.decode("utf-8", "surrogateescape")

HTTP request scheme, which should be "http" or "https".

authority: str
680    @property
681    def authority(self) -> str:
682        """
683        HTTP request authority.
684
685        For HTTP/1, this is the authority portion of the request target
686        (in either absolute-form or authority-form).
687        For origin-form and asterisk-form requests, this property is set to an empty string.
688
689        For HTTP/2, this is the :authority pseudo header.
690
691        *See also:* `Request.host`, `Request.host_header`, `Request.pretty_host`
692        """
693        try:
694            return self.data.authority.decode("idna")
695        except UnicodeError:
696            return self.data.authority.decode("utf8", "surrogateescape")

HTTP request authority.

For HTTP/1, this is the authority portion of the request target (in either absolute-form or authority-form). For origin-form and asterisk-form requests, this property is set to an empty string.

For HTTP/2, this is the :authority pseudo header.

See also: Request.host, Request.host_header, Request.pretty_host

host: str
707    @property
708    def host(self) -> str:
709        """
710        Target server for this request. This may be parsed from the raw request
711        (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line)
712        or inferred from the proxy mode (e.g. an IP in transparent mode).
713
714        Setting the host attribute also updates the host header and authority information, if present.
715
716        *See also:* `Request.authority`, `Request.host_header`, `Request.pretty_host`
717        """
718        return self.data.host

Target server for this request. This may be parsed from the raw request (e.g. from a GET http://example.com/ HTTP/1.1 request line) or inferred from the proxy mode (e.g. an IP in transparent mode).

Setting the host attribute also updates the host header and authority information, if present.

See also: Request.authority, Request.host_header, Request.pretty_host

host_header: str | None
725    @property
726    def host_header(self) -> str | None:
727        """
728        The request's host/authority header.
729
730        This property maps to either ``request.headers["Host"]`` or
731        ``request.authority``, depending on whether it's HTTP/1.x or HTTP/2.0.
732
733        *See also:* `Request.authority`,`Request.host`, `Request.pretty_host`
734        """
735        if self.is_http2 or self.is_http3:
736            return self.authority or self.data.headers.get("Host", None)
737        else:
738            return self.data.headers.get("Host", None)

The request's host/authority header.

This property maps to either request.headers["Host"] or request.authority, depending on whether it's HTTP/1.x or HTTP/2.0.

See also: Request.authority,Request.host, Request.pretty_host

port: int
753    @property
754    def port(self) -> int:
755        """
756        Target port.
757        """
758        return self.data.port

Target port.

path: str
778    @property
779    def path(self) -> str:
780        """
781        HTTP request path, e.g. "/index.html" or "/index.html?a=b".
782        Usually starts with a slash, except for OPTIONS requests, which may just be "*".
783
784        This attribute includes both path and query parts of the target URI
785        (see Sections 3.3 and 3.4 of [RFC3986](https://datatracker.ietf.org/doc/html/rfc3986)).
786        """
787        return self.data.path.decode("utf-8", "surrogateescape")

HTTP request path, e.g. "/index.html" or "/index.html?a=b". Usually starts with a slash, except for OPTIONS requests, which may just be "*".

This attribute includes both path and query parts of the target URI (see Sections 3.3 and 3.4 of RFC3986).

url: str
793    @property
794    def url(self) -> str:
795        """
796        The full URL string, constructed from `Request.scheme`, `Request.host`, `Request.port` and `Request.path`.
797
798        Settings this property updates these attributes as well.
799        """
800        if self.first_line_format == "authority":
801            return f"{self.host}:{self.port}"
802        path = self.path if self.path != "*" else ""
803        return url.unparse(self.scheme, self.host, self.port, path)

The full URL string, constructed from Request.scheme, Request.host, Request.port and Request.path.

Settings this property updates these attributes as well.

pretty_host: str
810    @property
811    def pretty_host(self) -> str:
812        """
813        *Read-only:* Like `Request.host`, but using `Request.host_header` header as an additional (preferred) data source.
814        This is useful in transparent mode where `Request.host` is only an IP address.
815
816        *Warning:* When working in adversarial environments, this may not reflect the actual destination
817        as the Host header could be spoofed.
818        """
819        authority = self.host_header
820        if authority:
821            return url.parse_authority(authority, check=False)[0]
822        else:
823            return self.host

Read-only: Like Request.host, but using Request.host_header header as an additional (preferred) data source. This is useful in transparent mode where Request.host is only an IP address.

Warning: When working in adversarial environments, this may not reflect the actual destination as the Host header could be spoofed.

pretty_url: str
825    @property
826    def pretty_url(self) -> str:
827        """
828        *Read-only:* Like `Request.url`, but using `Request.pretty_host` instead of `Request.host`.
829        """
830        if self.first_line_format == "authority":
831            return self.authority
832
833        host_header = self.host_header
834        if not host_header:
835            return self.url
836
837        pretty_host, pretty_port = url.parse_authority(host_header, check=False)
838        pretty_port = pretty_port or url.default_port(self.scheme) or 443
839        path = self.path if self.path != "*" else ""
840
841        return url.unparse(self.scheme, pretty_host, pretty_port, path)

Read-only: Like Request.url, but using Request.pretty_host instead of Request.host.

query: mitmproxy.coretypes.multidict.MultiDictView[str, str]
852    @property
853    def query(self) -> multidict.MultiDictView[str, str]:
854        """
855        The request query as a mutable mapping view on the request's path.
856        For the most part, this behaves like a dictionary.
857        Modifications to the MultiDictView update `Request.path`, and vice versa.
858        """
859        return multidict.MultiDictView(self._get_query, self._set_query)

The request query as a mutable mapping view on the request's path. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.path, and vice versa.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, str]
872    @property
873    def cookies(self) -> multidict.MultiDictView[str, str]:
874        """
875        The request cookies.
876        For the most part, this behaves like a dictionary.
877        Modifications to the MultiDictView update `Request.headers`, and vice versa.
878        """
879        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The request cookies. For the most part, this behaves like a dictionary. Modifications to the MultiDictView update Request.headers, and vice versa.

path_components: tuple[str, ...]
885    @property
886    def path_components(self) -> tuple[str, ...]:
887        """
888        The URL's path components as a tuple of strings.
889        Components are unquoted.
890        """
891        path = urllib.parse.urlparse(self.url).path
892        # This needs to be a tuple so that it's immutable.
893        # Otherwise, this would fail silently:
894        #   request.path_components.append("foo")
895        return tuple(url.unquote(i) for i in path.split("/") if i)

The URL's path components as a tuple of strings. Components are unquoted.

def anticache(self) -> None:
904    def anticache(self) -> None:
905        """
906        Modifies this request to remove headers that might produce a cached response.
907        """
908        delheaders = (
909            "if-modified-since",
910            "if-none-match",
911        )
912        for i in delheaders:
913            self.headers.pop(i, None)

Modifies this request to remove headers that might produce a cached response.

def anticomp(self) -> None:
915    def anticomp(self) -> None:
916        """
917        Modify the Accept-Encoding header to only accept uncompressed responses.
918        """
919        self.headers["accept-encoding"] = "identity"

Modify the Accept-Encoding header to only accept uncompressed responses.

def constrain_encoding(self) -> None:
921    def constrain_encoding(self) -> None:
922        """
923        Limits the permissible Accept-Encoding values, based on what we can decode appropriately.
924        """
925        accept_encoding = self.headers.get("accept-encoding")
926        if accept_encoding:
927            self.headers["accept-encoding"] = ", ".join(
928                e
929                for e in {"gzip", "identity", "deflate", "br", "zstd"}
930                if e in accept_encoding
931            )

Limits the permissible Accept-Encoding values, based on what we can decode appropriately.

urlencoded_form: mitmproxy.coretypes.multidict.MultiDictView[str, str]
950    @property
951    def urlencoded_form(self) -> multidict.MultiDictView[str, str]:
952        """
953        The URL-encoded form data.
954
955        If the content-type indicates non-form data or the form could not be parsed, this is set to
956        an empty `MultiDictView`.
957
958        Modifications to the MultiDictView update `Request.content`, and vice versa.
959        """
960        return multidict.MultiDictView(
961            self._get_urlencoded_form, self._set_urlencoded_form
962        )

The URL-encoded form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

multipart_form: mitmproxy.coretypes.multidict.MultiDictView[bytes, bytes]
 997    @property
 998    def multipart_form(self) -> multidict.MultiDictView[bytes, bytes]:
 999        """
1000        The multipart form data.
1001
1002        If the content-type indicates non-form data or the form could not be parsed, this is set to
1003        an empty `MultiDictView`.
1004
1005        Modifications to the MultiDictView update `Request.content`, and vice versa.
1006        """
1007        return multidict.MultiDictView(
1008            self._get_multipart_form, self._set_multipart_form
1009        )

The multipart form data.

If the content-type indicates non-form data or the form could not be parsed, this is set to an empty MultiDictView.

Modifications to the MultiDictView update Request.content, and vice versa.

class Response(Message):
1016class Response(Message):
1017    """
1018    An HTTP response.
1019    """
1020
1021    data: ResponseData
1022
1023    def __init__(
1024        self,
1025        http_version: bytes,
1026        status_code: int,
1027        reason: bytes,
1028        headers: Headers | tuple[tuple[bytes, bytes], ...],
1029        content: bytes | None,
1030        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1031        timestamp_start: float,
1032        timestamp_end: float | None,
1033    ):
1034        # auto-convert invalid types to retain compatibility with older code.
1035        if isinstance(http_version, str):
1036            http_version = http_version.encode("ascii", "strict")
1037        if isinstance(reason, str):
1038            reason = reason.encode("ascii", "strict")
1039
1040        if isinstance(content, str):
1041            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1042        if not isinstance(headers, Headers):
1043            headers = Headers(headers)
1044        if trailers is not None and not isinstance(trailers, Headers):
1045            trailers = Headers(trailers)
1046
1047        self.data = ResponseData(
1048            http_version=http_version,
1049            status_code=status_code,
1050            reason=reason,
1051            headers=headers,
1052            content=content,
1053            trailers=trailers,
1054            timestamp_start=timestamp_start,
1055            timestamp_end=timestamp_end,
1056        )
1057
1058    def __repr__(self) -> str:
1059        if self.raw_content:
1060            ct = self.headers.get("content-type", "unknown content type")
1061            size = human.pretty_size(len(self.raw_content))
1062            details = f"{ct}, {size}"
1063        else:
1064            details = "no content"
1065        return f"Response({self.status_code}, {details})"
1066
1067    @classmethod
1068    def make(
1069        cls,
1070        status_code: int = 200,
1071        content: bytes | str = b"",
1072        headers: (
1073            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1074        ) = (),
1075    ) -> "Response":
1076        """
1077        Simplified API for creating response objects.
1078        """
1079        if isinstance(headers, Headers):
1080            headers = headers
1081        elif isinstance(headers, dict):
1082            headers = Headers(
1083                (
1084                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1085                    always_bytes(v, "utf-8", "surrogateescape"),
1086                )
1087                for k, v in headers.items()
1088            )
1089        elif isinstance(headers, Iterable):
1090            headers = Headers(headers)  # type: ignore
1091        else:
1092            raise TypeError(
1093                "Expected headers to be an iterable or dict, but is {}.".format(
1094                    type(headers).__name__
1095                )
1096            )
1097
1098        resp = cls(
1099            b"HTTP/1.1",
1100            status_code,
1101            status_codes.RESPONSES.get(status_code, "").encode(),
1102            headers,
1103            None,
1104            None,
1105            time.time(),
1106            time.time(),
1107        )
1108
1109        # Assign this manually to update the content-length header.
1110        if isinstance(content, bytes):
1111            resp.content = content
1112        elif isinstance(content, str):
1113            resp.text = content
1114        else:
1115            raise TypeError(
1116                f"Expected content to be str or bytes, but is {type(content).__name__}."
1117            )
1118
1119        return resp
1120
1121    @property
1122    def status_code(self) -> int:
1123        """
1124        HTTP Status Code, e.g. ``200``.
1125        """
1126        return self.data.status_code
1127
1128    @status_code.setter
1129    def status_code(self, status_code: int) -> None:
1130        self.data.status_code = status_code
1131
1132    @property
1133    def reason(self) -> str:
1134        """
1135        HTTP reason phrase, for example "Not Found".
1136
1137        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1138        """
1139        # Encoding: http://stackoverflow.com/a/16674906/934719
1140        return self.data.reason.decode("ISO-8859-1")
1141
1142    @reason.setter
1143    def reason(self, reason: str | bytes) -> None:
1144        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1")
1145
1146    def _get_cookies(self):
1147        h = self.headers.get_all("set-cookie")
1148        all_cookies = cookies.parse_set_cookie_headers(h)
1149        return tuple((name, (value, attrs)) for name, value, attrs in all_cookies)
1150
1151    def _set_cookies(self, value):
1152        cookie_headers = []
1153        for k, v in value:
1154            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
1155            cookie_headers.append(header)
1156        self.headers.set_all("set-cookie", cookie_headers)
1157
1158    @property
1159    def cookies(
1160        self,
1161    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1162        """
1163        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1164        name strings, and values are `(cookie value, attributes)` tuples. Within
1165        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1166        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1167
1168        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1169        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1170        """
1171        return multidict.MultiDictView(self._get_cookies, self._set_cookies)
1172
1173    @cookies.setter
1174    def cookies(self, value):
1175        self._set_cookies(value)
1176
1177    def refresh(self, now=None):
1178        """
1179        This fairly complex and heuristic function refreshes a server
1180        response for replay.
1181
1182         - It adjusts date, expires, and last-modified headers.
1183         - It adjusts cookie expiration.
1184        """
1185        if not now:
1186            now = time.time()
1187        delta = now - self.timestamp_start
1188        refresh_headers = [
1189            "date",
1190            "expires",
1191            "last-modified",
1192        ]
1193        for i in refresh_headers:
1194            if i in self.headers:
1195                d = parsedate_tz(self.headers[i])
1196                if d:
1197                    new = mktime_tz(d) + delta
1198                    try:
1199                        self.headers[i] = formatdate(new, usegmt=True)
1200                    except OSError:  # pragma: no cover
1201                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1202        c = []
1203        for set_cookie_header in self.headers.get_all("set-cookie"):
1204            try:
1205                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1206            except ValueError:
1207                refreshed = set_cookie_header
1208            c.append(refreshed)
1209        if c:
1210            self.headers.set_all("set-cookie", c)

An HTTP response.

Response( http_version: bytes, status_code: int, reason: bytes, headers: Headers | tuple[tuple[bytes, bytes], ...], content: bytes | None, trailers: None | Headers | tuple[tuple[bytes, bytes], ...], timestamp_start: float, timestamp_end: float | None)
1023    def __init__(
1024        self,
1025        http_version: bytes,
1026        status_code: int,
1027        reason: bytes,
1028        headers: Headers | tuple[tuple[bytes, bytes], ...],
1029        content: bytes | None,
1030        trailers: None | Headers | tuple[tuple[bytes, bytes], ...],
1031        timestamp_start: float,
1032        timestamp_end: float | None,
1033    ):
1034        # auto-convert invalid types to retain compatibility with older code.
1035        if isinstance(http_version, str):
1036            http_version = http_version.encode("ascii", "strict")
1037        if isinstance(reason, str):
1038            reason = reason.encode("ascii", "strict")
1039
1040        if isinstance(content, str):
1041            raise ValueError(f"Content must be bytes, not {type(content).__name__}")
1042        if not isinstance(headers, Headers):
1043            headers = Headers(headers)
1044        if trailers is not None and not isinstance(trailers, Headers):
1045            trailers = Headers(trailers)
1046
1047        self.data = ResponseData(
1048            http_version=http_version,
1049            status_code=status_code,
1050            reason=reason,
1051            headers=headers,
1052            content=content,
1053            trailers=trailers,
1054            timestamp_start=timestamp_start,
1055            timestamp_end=timestamp_end,
1056        )
@classmethod
def make( cls, status_code: int = 200, content: bytes | str = b'', headers: Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]] = ()) -> Response:
1067    @classmethod
1068    def make(
1069        cls,
1070        status_code: int = 200,
1071        content: bytes | str = b"",
1072        headers: (
1073            Headers | Mapping[str, str | bytes] | Iterable[tuple[bytes, bytes]]
1074        ) = (),
1075    ) -> "Response":
1076        """
1077        Simplified API for creating response objects.
1078        """
1079        if isinstance(headers, Headers):
1080            headers = headers
1081        elif isinstance(headers, dict):
1082            headers = Headers(
1083                (
1084                    always_bytes(k, "utf-8", "surrogateescape"),  # type: ignore
1085                    always_bytes(v, "utf-8", "surrogateescape"),
1086                )
1087                for k, v in headers.items()
1088            )
1089        elif isinstance(headers, Iterable):
1090            headers = Headers(headers)  # type: ignore
1091        else:
1092            raise TypeError(
1093                "Expected headers to be an iterable or dict, but is {}.".format(
1094                    type(headers).__name__
1095                )
1096            )
1097
1098        resp = cls(
1099            b"HTTP/1.1",
1100            status_code,
1101            status_codes.RESPONSES.get(status_code, "").encode(),
1102            headers,
1103            None,
1104            None,
1105            time.time(),
1106            time.time(),
1107        )
1108
1109        # Assign this manually to update the content-length header.
1110        if isinstance(content, bytes):
1111            resp.content = content
1112        elif isinstance(content, str):
1113            resp.text = content
1114        else:
1115            raise TypeError(
1116                f"Expected content to be str or bytes, but is {type(content).__name__}."
1117            )
1118
1119        return resp

Simplified API for creating response objects.

status_code: int
1121    @property
1122    def status_code(self) -> int:
1123        """
1124        HTTP Status Code, e.g. ``200``.
1125        """
1126        return self.data.status_code

HTTP Status Code, e.g. 200.

reason: str
1132    @property
1133    def reason(self) -> str:
1134        """
1135        HTTP reason phrase, for example "Not Found".
1136
1137        HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.
1138        """
1139        # Encoding: http://stackoverflow.com/a/16674906/934719
1140        return self.data.reason.decode("ISO-8859-1")

HTTP reason phrase, for example "Not Found".

HTTP/2 responses do not contain a reason phrase, an empty string will be returned instead.

cookies: mitmproxy.coretypes.multidict.MultiDictView[str, tuple[str, mitmproxy.coretypes.multidict.MultiDict[str, str | None]]]
1158    @property
1159    def cookies(
1160        self,
1161    ) -> multidict.MultiDictView[str, tuple[str, multidict.MultiDict[str, str | None]]]:
1162        """
1163        The response cookies. A possibly empty `MultiDictView`, where the keys are cookie
1164        name strings, and values are `(cookie value, attributes)` tuples. Within
1165        attributes, unary attributes (e.g. `HTTPOnly`) are indicated by a `None` value.
1166        Modifications to the MultiDictView update `Response.headers`, and vice versa.
1167
1168        *Warning:* Changes to `attributes` will not be picked up unless you also reassign
1169        the `(cookie value, attributes)` tuple directly in the `MultiDictView`.
1170        """
1171        return multidict.MultiDictView(self._get_cookies, self._set_cookies)

The response cookies. A possibly empty MultiDictView, where the keys are cookie name strings, and values are (cookie value, attributes) tuples. Within attributes, unary attributes (e.g. HTTPOnly) are indicated by a None value. Modifications to the MultiDictView update Response.headers, and vice versa.

Warning: Changes to attributes will not be picked up unless you also reassign the (cookie value, attributes) tuple directly in the MultiDictView.

def refresh(self, now=None):
1177    def refresh(self, now=None):
1178        """
1179        This fairly complex and heuristic function refreshes a server
1180        response for replay.
1181
1182         - It adjusts date, expires, and last-modified headers.
1183         - It adjusts cookie expiration.
1184        """
1185        if not now:
1186            now = time.time()
1187        delta = now - self.timestamp_start
1188        refresh_headers = [
1189            "date",
1190            "expires",
1191            "last-modified",
1192        ]
1193        for i in refresh_headers:
1194            if i in self.headers:
1195                d = parsedate_tz(self.headers[i])
1196                if d:
1197                    new = mktime_tz(d) + delta
1198                    try:
1199                        self.headers[i] = formatdate(new, usegmt=True)
1200                    except OSError:  # pragma: no cover
1201                        pass  # value out of bounds on Windows only (which is why we exclude it from coverage).
1202        c = []
1203        for set_cookie_header in self.headers.get_all("set-cookie"):
1204            try:
1205                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
1206            except ValueError:
1207                refreshed = set_cookie_header
1208            c.append(refreshed)
1209        if c:
1210            self.headers.set_all("set-cookie", c)

This fairly complex and heuristic function refreshes a server response for replay.

  • It adjusts date, expires, and last-modified headers.
  • It adjusts cookie expiration.
class Headers(mitmproxy.coretypes.multidict._MultiDict[~KT, ~VT], mitmproxy.coretypes.serializable.Serializable):
 50class Headers(multidict.MultiDict):  # type: ignore
 51    """
 52    Header class which allows both convenient access to individual headers as well as
 53    direct access to the underlying raw data. Provides a full dictionary interface.
 54
 55    Create headers with keyword arguments:
 56    >>> h = Headers(host="example.com", content_type="application/xml")
 57
 58    Headers mostly behave like a normal dict:
 59    >>> h["Host"]
 60    "example.com"
 61
 62    Headers are case insensitive:
 63    >>> h["host"]
 64    "example.com"
 65
 66    Headers can also be created from a list of raw (header_name, header_value) byte tuples:
 67    >>> h = Headers([
 68        (b"Host",b"example.com"),
 69        (b"Accept",b"text/html"),
 70        (b"accept",b"application/xml")
 71    ])
 72
 73    Multiple headers are folded into a single header as per RFC 7230:
 74    >>> h["Accept"]
 75    "text/html, application/xml"
 76
 77    Setting a header removes all existing headers with the same name:
 78    >>> h["Accept"] = "application/text"
 79    >>> h["Accept"]
 80    "application/text"
 81
 82    `bytes(h)` returns an HTTP/1 header block:
 83    >>> print(bytes(h))
 84    Host: example.com
 85    Accept: application/text
 86
 87    For full control, the raw header fields can be accessed:
 88    >>> h.fields
 89
 90    Caveats:
 91     - For use with the "Set-Cookie" and "Cookie" headers, either use `Response.cookies` or see `Headers.get_all`.
 92    """
 93
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )
119
120    fields: tuple[tuple[bytes, bytes], ...]
121
122    @staticmethod
123    def _reduce_values(values) -> str:
124        # Headers can be folded
125        return ", ".join(values)
126
127    @staticmethod
128    def _kconv(key) -> str:
129        # Headers are case-insensitive
130        return key.lower()
131
132    def __bytes__(self) -> bytes:
133        if self.fields:
134            return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n"
135        else:
136            return b""
137
138    def __delitem__(self, key: str | bytes) -> None:
139        key = _always_bytes(key)
140        super().__delitem__(key)
141
142    def __iter__(self) -> Iterator[str]:
143        for x in super().__iter__():
144            yield _native(x)
145
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]
158
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)
167
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)
172
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface.

Create headers with keyword arguments:

>>> h = Headers(host="example.com", content_type="application/xml")

Headers mostly behave like a normal dict:

>>> h["Host"]
"example.com"

Headers are case insensitive:

>>> h["host"]
"example.com"

Headers can also be created from a list of raw (header_name, header_value) byte tuples:

>>> h = Headers([
    (b"Host",b"example.com"),
    (b"Accept",b"text/html"),
    (b"accept",b"application/xml")
])

Multiple headers are folded into a single header as per RFC 7230:

>>> h["Accept"]
"text/html, application/xml"

Setting a header removes all existing headers with the same name:

>>> h["Accept"] = "application/text"
>>> h["Accept"]
"application/text"

bytes(h) returns an HTTP/1 header block:

>>> print(bytes(h))
Host: example.com
Accept: application/text

For full control, the raw header fields can be accessed:

>>> h.fields

Caveats:

Headers(fields: Iterable[tuple[bytes, bytes]] = (), **headers)
 94    def __init__(self, fields: Iterable[tuple[bytes, bytes]] = (), **headers):
 95        """
 96        *Args:*
 97         - *fields:* (optional) list of ``(name, value)`` header byte tuples,
 98           e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes.
 99         - *\\*\\*headers:* Additional headers to set. Will overwrite existing values from `fields`.
100           For convenience, underscores in header names will be transformed to dashes -
101           this behaviour does not extend to other methods.
102
103        If ``**headers`` contains multiple keys that have equal ``.lower()`` representations,
104        the behavior is undefined.
105        """
106        super().__init__(fields)
107
108        for key, value in self.fields:
109            if not isinstance(key, bytes) or not isinstance(value, bytes):
110                raise TypeError("Header fields must be bytes.")
111
112        # content_type -> content-type
113        self.update(
114            {
115                _always_bytes(name).replace(b"_", b"-"): _always_bytes(value)
116                for name, value in headers.items()
117            }
118        )

Args:

  • fields: (optional) list of (name, value) header byte tuples, e.g. [(b"Host", b"example.com")]. All names and values must be bytes.
  • **headers: Additional headers to set. Will overwrite existing values from fields. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods.

If **headers contains multiple keys that have equal .lower() representations, the behavior is undefined.

fields: tuple[tuple[bytes, bytes], ...]

The underlying raw datastructure.

def get_all(self, name: str | bytes) -> list[str]:
146    def get_all(self, name: str | bytes) -> list[str]:
147        """
148        Like `Headers.get`, but does not fold multiple headers into a single one.
149        This is useful for Set-Cookie and Cookie headers, which do not support folding.
150
151        *See also:*
152         - <https://tools.ietf.org/html/rfc7230#section-3.2.2>
153         - <https://datatracker.ietf.org/doc/html/rfc6265#section-5.4>
154         - <https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.5>
155        """
156        name = _always_bytes(name)
157        return [_native(x) for x in super().get_all(name)]

Like Headers.get, but does not fold multiple headers into a single one. This is useful for Set-Cookie and Cookie headers, which do not support folding.

See also:

def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
159    def set_all(self, name: str | bytes, values: Iterable[str | bytes]):
160        """
161        Explicitly set multiple headers for the given key.
162        See `Headers.get_all`.
163        """
164        name = _always_bytes(name)
165        values = [_always_bytes(x) for x in values]
166        return super().set_all(name, values)

Explicitly set multiple headers for the given key. See Headers.get_all.

def insert(self, index: int, key: str | bytes, value: str | bytes):
168    def insert(self, index: int, key: str | bytes, value: str | bytes):
169        key = _always_bytes(key)
170        value = _always_bytes(value)
171        super().insert(index, key, value)

Insert an additional value for the given key at the specified position.

def items(self, multi=False):
173    def items(self, multi=False):
174        if multi:
175            return ((_native(k), _native(v)) for k, v in self.fields)
176        else:
177            return super().items()

Get all (key, value) tuples.

If multi is True, all (key, value) pairs will be returned. If False, only one tuple per key is returned.

built with pdocpdoc logo