Edit on GitHub

mitmproxy.flow

  1import asyncio
  2import time
  3import uuid
  4from typing import Any, ClassVar, Optional
  5
  6from mitmproxy import connection
  7from mitmproxy import exceptions
  8from mitmproxy import stateobject
  9from mitmproxy import version
 10
 11
 12class Error(stateobject.StateObject):
 13    """
 14    An Error.
 15
 16    This is distinct from an protocol error response (say, a HTTP code 500),
 17    which is represented by a normal `mitmproxy.http.Response` object. This class is
 18    responsible for indicating errors that fall outside of normal protocol
 19    communications, like interrupted connections, timeouts, or protocol errors.
 20    """
 21
 22    msg: str
 23    """Message describing the error."""
 24
 25    timestamp: float
 26    """Unix timestamp of when this error happened."""
 27
 28    KILLED_MESSAGE: ClassVar[str] = "Connection killed."
 29
 30    def __init__(self, msg: str, timestamp: Optional[float] = None) -> None:
 31        """Create an error. If no timestamp is passed, the current time is used."""
 32        self.msg = msg
 33        self.timestamp = timestamp or time.time()
 34
 35    _stateobject_attributes = dict(msg=str, timestamp=float)
 36
 37    def __str__(self):
 38        return self.msg
 39
 40    def __repr__(self):
 41        return self.msg
 42
 43    @classmethod
 44    def from_state(cls, state):
 45        # the default implementation assumes an empty constructor. Override
 46        # accordingly.
 47        f = cls(None)
 48        f.set_state(state)
 49        return f
 50
 51
 52class Flow(stateobject.StateObject):
 53    """
 54    Base class for network flows. A flow is a collection of objects,
 55    for example HTTP request/response pairs or a list of TCP messages.
 56
 57    See also:
 58     - mitmproxy.http.HTTPFlow
 59     - mitmproxy.tcp.TCPFlow
 60    """
 61
 62    client_conn: connection.Client
 63    """The client that connected to mitmproxy."""
 64
 65    server_conn: connection.Server
 66    """
 67    The server mitmproxy connected to.
 68
 69    Some flows may never cause mitmproxy to initiate a server connection,
 70    for example because their response is replayed by mitmproxy itself.
 71    To simplify implementation, those flows will still have a `server_conn` attribute
 72    with a `timestamp_start` set to `None`.
 73    """
 74
 75    error: Optional[Error] = None
 76    """A connection or protocol error affecting this flow."""
 77
 78    intercepted: bool
 79    """
 80    If `True`, the flow is currently paused by mitmproxy.
 81    We're waiting for a user action to forward the flow to its destination.
 82    """
 83
 84    marked: str = ""
 85    """
 86    If this attribute is a non-empty string the flow has been marked by the user.
 87
 88    A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name.
 89
 90    For example `:grapes:` becomes `🍇` in views that support emoji rendering.
 91    Consult the [Github API Emoji List](https://api.github.com/emojis) for a list of emoji that may be used.
 92    Not all emoji, especially [emoji modifiers](https://en.wikipedia.org/wiki/Miscellaneous_Symbols_and_Pictographs#Emoji_modifiers)
 93    will render consistently.
 94
 95    The default marker for the view will be used if the Unicode emoji name can not be interpreted.
 96    """
 97
 98    is_replay: Optional[str]
 99    """
100    This attribute indicates if this flow has been replayed in either direction.
101
102     - a value of `request` indicates that the request has been artifically replayed by mitmproxy to the server.
103     - a value of `response` indicates that the response to the client's request has been set by server replay.
104    """
105
106    live: bool
107    """
108    If `True`, the flow belongs to a currently active connection.
109    If `False`, the flow may have been already completed or loaded from disk.
110    """
111
112    timestamp_created: float
113    """
114    The Unix timestamp of when this flow was created.
115
116    In contrast to `timestamp_start`, this value will not change when a flow is replayed.
117    """
118
119    def __init__(
120        self,
121        client_conn: connection.Client,
122        server_conn: connection.Server,
123        live: bool = False,
124    ) -> None:
125        self.id = str(uuid.uuid4())
126        self.client_conn = client_conn
127        self.server_conn = server_conn
128        self.live = live
129        self.timestamp_created = time.time()
130
131        self.intercepted: bool = False
132        self._resume_event: Optional[asyncio.Event] = None
133        self._backup: Optional[Flow] = None
134        self.marked: str = ""
135        self.is_replay: Optional[str] = None
136        self.metadata: dict[str, Any] = dict()
137        self.comment: str = ""
138
139    _stateobject_attributes = dict(
140        id=str,
141        error=Error,
142        client_conn=connection.Client,
143        server_conn=connection.Server,
144        intercepted=bool,
145        is_replay=str,
146        marked=str,
147        metadata=dict[str, Any],
148        comment=str,
149        timestamp_created=float,
150    )
151
152    __types: dict[str, type["Flow"]] = {}
153
154    @classmethod
155    @property
156    def type(cls) -> str:
157        """The flow type, for example `http`, `tcp`, or `dns`."""
158        return cls.__name__.removesuffix("Flow").lower()
159
160    def __init_subclass__(cls, **kwargs):
161        Flow.__types[cls.type] = cls
162
163    def get_state(self):
164        d = super().get_state()
165        d.update(version=version.FLOW_FORMAT_VERSION, type=self.type)
166        if self._backup and self._backup != d:
167            d.update(backup=self._backup)
168        return d
169
170    def set_state(self, state):
171        state = state.copy()
172        state.pop("version")
173        state.pop("type")
174        if "backup" in state:
175            self._backup = state.pop("backup")
176        super().set_state(state)
177
178    @classmethod
179    def from_state(cls, state):
180        try:
181            flow_cls = Flow.__types[state["type"]]
182        except KeyError:
183            raise ValueError(f"Unknown flow type: {state['type']}")
184        f = flow_cls(None, None)  # noqa
185        f.set_state(state)
186        return f
187
188    def copy(self):
189        """Make a copy of this flow."""
190        f = super().copy()
191        f.live = False
192        return f
193
194    def modified(self):
195        """
196        `True` if this file has been modified by a user, `False` otherwise.
197        """
198        if self._backup:
199            return self._backup != self.get_state()
200        else:
201            return False
202
203    def backup(self, force=False):
204        """
205        Save a backup of this flow, which can be restored by calling `Flow.revert()`.
206        """
207        if not self._backup:
208            self._backup = self.get_state()
209
210    def revert(self):
211        """
212        Revert to the last backed up state.
213        """
214        if self._backup:
215            self.set_state(self._backup)
216            self._backup = None
217
218    @property
219    def killable(self):
220        """*Read-only:* `True` if this flow can be killed, `False` otherwise."""
221        return self.live and not (self.error and self.error.msg == Error.KILLED_MESSAGE)
222
223    def kill(self):
224        """
225        Kill this flow. The current request/response will not be forwarded to its destination.
226        """
227        if not self.killable:
228            raise exceptions.ControlException("Flow is not killable.")
229        # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill
230        #  flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised
231        #  as killable. An alternative approach would be to introduce a `KillInjected` event similar to
232        #  `MessageInjected`, which should fix this issue.
233        self.error = Error(Error.KILLED_MESSAGE)
234        self.intercepted = False
235        self.live = False
236
237    def intercept(self):
238        """
239        Intercept this Flow. Processing will stop until resume is
240        called.
241        """
242        if self.intercepted:
243            return
244        self.intercepted = True
245        if self._resume_event is not None:
246            self._resume_event.clear()
247
248    async def wait_for_resume(self):
249        """
250        Wait until this Flow is resumed.
251        """
252        if not self.intercepted:
253            return
254        if self._resume_event is None:
255            self._resume_event = asyncio.Event()
256        await self._resume_event.wait()
257
258    def resume(self):
259        """
260        Continue with the flow – called after an intercept().
261        """
262        if not self.intercepted:
263            return
264        self.intercepted = False
265        if self._resume_event is not None:
266            self._resume_event.set()
267
268    @property
269    def timestamp_start(self) -> float:
270        """
271        *Read-only:* Start time of the flow.
272        Depending on the flow type, this property is an alias for
273        `mitmproxy.connection.Client.timestamp_start` or `mitmproxy.http.Request.timestamp_start`.
274        """
275        return self.client_conn.timestamp_start
276
277
278__all__ = [
279    "Flow",
280    "Error",
281]
class Flow(mitmproxy.stateobject.StateObject):
 53class Flow(stateobject.StateObject):
 54    """
 55    Base class for network flows. A flow is a collection of objects,
 56    for example HTTP request/response pairs or a list of TCP messages.
 57
 58    See also:
 59     - mitmproxy.http.HTTPFlow
 60     - mitmproxy.tcp.TCPFlow
 61    """
 62
 63    client_conn: connection.Client
 64    """The client that connected to mitmproxy."""
 65
 66    server_conn: connection.Server
 67    """
 68    The server mitmproxy connected to.
 69
 70    Some flows may never cause mitmproxy to initiate a server connection,
 71    for example because their response is replayed by mitmproxy itself.
 72    To simplify implementation, those flows will still have a `server_conn` attribute
 73    with a `timestamp_start` set to `None`.
 74    """
 75
 76    error: Optional[Error] = None
 77    """A connection or protocol error affecting this flow."""
 78
 79    intercepted: bool
 80    """
 81    If `True`, the flow is currently paused by mitmproxy.
 82    We're waiting for a user action to forward the flow to its destination.
 83    """
 84
 85    marked: str = ""
 86    """
 87    If this attribute is a non-empty string the flow has been marked by the user.
 88
 89    A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name.
 90
 91    For example `:grapes:` becomes `🍇` in views that support emoji rendering.
 92    Consult the [Github API Emoji List](https://api.github.com/emojis) for a list of emoji that may be used.
 93    Not all emoji, especially [emoji modifiers](https://en.wikipedia.org/wiki/Miscellaneous_Symbols_and_Pictographs#Emoji_modifiers)
 94    will render consistently.
 95
 96    The default marker for the view will be used if the Unicode emoji name can not be interpreted.
 97    """
 98
 99    is_replay: Optional[str]
100    """
101    This attribute indicates if this flow has been replayed in either direction.
102
103     - a value of `request` indicates that the request has been artifically replayed by mitmproxy to the server.
104     - a value of `response` indicates that the response to the client's request has been set by server replay.
105    """
106
107    live: bool
108    """
109    If `True`, the flow belongs to a currently active connection.
110    If `False`, the flow may have been already completed or loaded from disk.
111    """
112
113    timestamp_created: float
114    """
115    The Unix timestamp of when this flow was created.
116
117    In contrast to `timestamp_start`, this value will not change when a flow is replayed.
118    """
119
120    def __init__(
121        self,
122        client_conn: connection.Client,
123        server_conn: connection.Server,
124        live: bool = False,
125    ) -> None:
126        self.id = str(uuid.uuid4())
127        self.client_conn = client_conn
128        self.server_conn = server_conn
129        self.live = live
130        self.timestamp_created = time.time()
131
132        self.intercepted: bool = False
133        self._resume_event: Optional[asyncio.Event] = None
134        self._backup: Optional[Flow] = None
135        self.marked: str = ""
136        self.is_replay: Optional[str] = None
137        self.metadata: dict[str, Any] = dict()
138        self.comment: str = ""
139
140    _stateobject_attributes = dict(
141        id=str,
142        error=Error,
143        client_conn=connection.Client,
144        server_conn=connection.Server,
145        intercepted=bool,
146        is_replay=str,
147        marked=str,
148        metadata=dict[str, Any],
149        comment=str,
150        timestamp_created=float,
151    )
152
153    __types: dict[str, type["Flow"]] = {}
154
155    @classmethod
156    @property
157    def type(cls) -> str:
158        """The flow type, for example `http`, `tcp`, or `dns`."""
159        return cls.__name__.removesuffix("Flow").lower()
160
161    def __init_subclass__(cls, **kwargs):
162        Flow.__types[cls.type] = cls
163
164    def get_state(self):
165        d = super().get_state()
166        d.update(version=version.FLOW_FORMAT_VERSION, type=self.type)
167        if self._backup and self._backup != d:
168            d.update(backup=self._backup)
169        return d
170
171    def set_state(self, state):
172        state = state.copy()
173        state.pop("version")
174        state.pop("type")
175        if "backup" in state:
176            self._backup = state.pop("backup")
177        super().set_state(state)
178
179    @classmethod
180    def from_state(cls, state):
181        try:
182            flow_cls = Flow.__types[state["type"]]
183        except KeyError:
184            raise ValueError(f"Unknown flow type: {state['type']}")
185        f = flow_cls(None, None)  # noqa
186        f.set_state(state)
187        return f
188
189    def copy(self):
190        """Make a copy of this flow."""
191        f = super().copy()
192        f.live = False
193        return f
194
195    def modified(self):
196        """
197        `True` if this file has been modified by a user, `False` otherwise.
198        """
199        if self._backup:
200            return self._backup != self.get_state()
201        else:
202            return False
203
204    def backup(self, force=False):
205        """
206        Save a backup of this flow, which can be restored by calling `Flow.revert()`.
207        """
208        if not self._backup:
209            self._backup = self.get_state()
210
211    def revert(self):
212        """
213        Revert to the last backed up state.
214        """
215        if self._backup:
216            self.set_state(self._backup)
217            self._backup = None
218
219    @property
220    def killable(self):
221        """*Read-only:* `True` if this flow can be killed, `False` otherwise."""
222        return self.live and not (self.error and self.error.msg == Error.KILLED_MESSAGE)
223
224    def kill(self):
225        """
226        Kill this flow. The current request/response will not be forwarded to its destination.
227        """
228        if not self.killable:
229            raise exceptions.ControlException("Flow is not killable.")
230        # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill
231        #  flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised
232        #  as killable. An alternative approach would be to introduce a `KillInjected` event similar to
233        #  `MessageInjected`, which should fix this issue.
234        self.error = Error(Error.KILLED_MESSAGE)
235        self.intercepted = False
236        self.live = False
237
238    def intercept(self):
239        """
240        Intercept this Flow. Processing will stop until resume is
241        called.
242        """
243        if self.intercepted:
244            return
245        self.intercepted = True
246        if self._resume_event is not None:
247            self._resume_event.clear()
248
249    async def wait_for_resume(self):
250        """
251        Wait until this Flow is resumed.
252        """
253        if not self.intercepted:
254            return
255        if self._resume_event is None:
256            self._resume_event = asyncio.Event()
257        await self._resume_event.wait()
258
259    def resume(self):
260        """
261        Continue with the flow – called after an intercept().
262        """
263        if not self.intercepted:
264            return
265        self.intercepted = False
266        if self._resume_event is not None:
267            self._resume_event.set()
268
269    @property
270    def timestamp_start(self) -> float:
271        """
272        *Read-only:* Start time of the flow.
273        Depending on the flow type, this property is an alias for
274        `mitmproxy.connection.Client.timestamp_start` or `mitmproxy.http.Request.timestamp_start`.
275        """
276        return self.client_conn.timestamp_start

Base class for network flows. A flow is a collection of objects, for example HTTP request/response pairs or a list of TCP messages.

See also:

The client that connected to mitmproxy.

The server mitmproxy connected to.

Some flows may never cause mitmproxy to initiate a server connection, for example because their response is replayed by mitmproxy itself. To simplify implementation, those flows will still have a server_conn attribute with a timestamp_start set to None.

error: Optional[mitmproxy.flow.Error] = None

A connection or protocol error affecting this flow.

intercepted: bool

If True, the flow is currently paused by mitmproxy. We're waiting for a user action to forward the flow to its destination.

marked: str = ''

If this attribute is a non-empty string the flow has been marked by the user.

A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name.

For example :grapes: becomes 🍇 in views that support emoji rendering. Consult the Github API Emoji List for a list of emoji that may be used. Not all emoji, especially emoji modifiers will render consistently.

The default marker for the view will be used if the Unicode emoji name can not be interpreted.

is_replay: Optional[str]

This attribute indicates if this flow has been replayed in either direction.

  • a value of request indicates that the request has been artifically replayed by mitmproxy to the server.
  • a value of response indicates that the response to the client's request has been set by server replay.
live: bool

If True, the flow belongs to a currently active connection. If False, the flow may have been already completed or loaded from disk.

timestamp_created: float

The Unix timestamp of when this flow was created.

In contrast to timestamp_start, this value will not change when a flow is replayed.

type: str

The flow type, for example http, tcp, or dns.

def copy(self):
189    def copy(self):
190        """Make a copy of this flow."""
191        f = super().copy()
192        f.live = False
193        return f

Make a copy of this flow.

def modified(self):
195    def modified(self):
196        """
197        `True` if this file has been modified by a user, `False` otherwise.
198        """
199        if self._backup:
200            return self._backup != self.get_state()
201        else:
202            return False

True if this file has been modified by a user, False otherwise.

def backup(self, force=False):
204    def backup(self, force=False):
205        """
206        Save a backup of this flow, which can be restored by calling `Flow.revert()`.
207        """
208        if not self._backup:
209            self._backup = self.get_state()

Save a backup of this flow, which can be restored by calling Flow.revert().

def revert(self):
211    def revert(self):
212        """
213        Revert to the last backed up state.
214        """
215        if self._backup:
216            self.set_state(self._backup)
217            self._backup = None

Revert to the last backed up state.

killable

Read-only: True if this flow can be killed, False otherwise.

def kill(self):
224    def kill(self):
225        """
226        Kill this flow. The current request/response will not be forwarded to its destination.
227        """
228        if not self.killable:
229            raise exceptions.ControlException("Flow is not killable.")
230        # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill
231        #  flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised
232        #  as killable. An alternative approach would be to introduce a `KillInjected` event similar to
233        #  `MessageInjected`, which should fix this issue.
234        self.error = Error(Error.KILLED_MESSAGE)
235        self.intercepted = False
236        self.live = False

Kill this flow. The current request/response will not be forwarded to its destination.

def intercept(self):
238    def intercept(self):
239        """
240        Intercept this Flow. Processing will stop until resume is
241        called.
242        """
243        if self.intercepted:
244            return
245        self.intercepted = True
246        if self._resume_event is not None:
247            self._resume_event.clear()

Intercept this Flow. Processing will stop until resume is called.

async def wait_for_resume(self):
249    async def wait_for_resume(self):
250        """
251        Wait until this Flow is resumed.
252        """
253        if not self.intercepted:
254            return
255        if self._resume_event is None:
256            self._resume_event = asyncio.Event()
257        await self._resume_event.wait()

Wait until this Flow is resumed.

def resume(self):
259    def resume(self):
260        """
261        Continue with the flow – called after an intercept().
262        """
263        if not self.intercepted:
264            return
265        self.intercepted = False
266        if self._resume_event is not None:
267            self._resume_event.set()

Continue with the flow – called after an intercept().

timestamp_start: float

Read-only: Start time of the flow. Depending on the flow type, this property is an alias for mitmproxy.connection.Client.timestamp_start or mitmproxy.http.Request.timestamp_start.

class Error(mitmproxy.stateobject.StateObject):
13class Error(stateobject.StateObject):
14    """
15    An Error.
16
17    This is distinct from an protocol error response (say, a HTTP code 500),
18    which is represented by a normal `mitmproxy.http.Response` object. This class is
19    responsible for indicating errors that fall outside of normal protocol
20    communications, like interrupted connections, timeouts, or protocol errors.
21    """
22
23    msg: str
24    """Message describing the error."""
25
26    timestamp: float
27    """Unix timestamp of when this error happened."""
28
29    KILLED_MESSAGE: ClassVar[str] = "Connection killed."
30
31    def __init__(self, msg: str, timestamp: Optional[float] = None) -> None:
32        """Create an error. If no timestamp is passed, the current time is used."""
33        self.msg = msg
34        self.timestamp = timestamp or time.time()
35
36    _stateobject_attributes = dict(msg=str, timestamp=float)
37
38    def __str__(self):
39        return self.msg
40
41    def __repr__(self):
42        return self.msg
43
44    @classmethod
45    def from_state(cls, state):
46        # the default implementation assumes an empty constructor. Override
47        # accordingly.
48        f = cls(None)
49        f.set_state(state)
50        return f

An Error.

This is distinct from an protocol error response (say, a HTTP code 500), which is represented by a normal mitmproxy.http.Response object. This class is responsible for indicating errors that fall outside of normal protocol communications, like interrupted connections, timeouts, or protocol errors.

msg: str

Message describing the error.

timestamp: float

Unix timestamp of when this error happened.

KILLED_MESSAGE: ClassVar[str] = 'Connection killed.'
Inherited Members
mitmproxy.coretypes.serializable.Serializable
copy