mitmproxy.flow
1from __future__ import annotations 2 3import asyncio 4import copy 5import time 6import uuid 7from dataclasses import dataclass 8from dataclasses import field 9from typing import Any 10from typing import ClassVar 11 12from mitmproxy import connection 13from mitmproxy import exceptions 14from mitmproxy import version 15from mitmproxy.coretypes import serializable 16 17 18@dataclass 19class Error(serializable.SerializableDataclass): 20 """ 21 An Error. 22 23 This is distinct from an protocol error response (say, a HTTP code 500), 24 which is represented by a normal `mitmproxy.http.Response` object. This class is 25 responsible for indicating errors that fall outside of normal protocol 26 communications, like interrupted connections, timeouts, or protocol errors. 27 """ 28 29 msg: str 30 """Message describing the error.""" 31 32 timestamp: float = field(default_factory=time.time) 33 """Unix timestamp of when this error happened.""" 34 35 KILLED_MESSAGE: ClassVar[str] = "Connection killed." 36 37 def __str__(self): 38 return self.msg 39 40 def __repr__(self): 41 return self.msg 42 43 44class Flow(serializable.Serializable): 45 """ 46 Base class for network flows. A flow is a collection of objects, 47 for example HTTP request/response pairs or a list of TCP messages. 48 49 See also: 50 - mitmproxy.http.HTTPFlow 51 - mitmproxy.tcp.TCPFlow 52 - mitmproxy.udp.UDPFlow 53 """ 54 55 client_conn: connection.Client 56 """The client that connected to mitmproxy.""" 57 58 server_conn: connection.Server 59 """ 60 The server mitmproxy connected to. 61 62 Some flows may never cause mitmproxy to initiate a server connection, 63 for example because their response is replayed by mitmproxy itself. 64 To simplify implementation, those flows will still have a `server_conn` attribute 65 with a `timestamp_start` set to `None`. 66 """ 67 68 error: Error | None = None 69 """A connection or protocol error affecting this flow.""" 70 71 intercepted: bool 72 """ 73 If `True`, the flow is currently paused by mitmproxy. 74 We're waiting for a user action to forward the flow to its destination. 75 """ 76 77 marked: str = "" 78 """ 79 If this attribute is a non-empty string the flow has been marked by the user. 80 81 A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name. 82 83 For example `:grapes:` becomes `🍇` in views that support emoji rendering. 84 Consult the [Github API Emoji List](https://api.github.com/emojis) for a list of emoji that may be used. 85 Not all emoji, especially [emoji modifiers](https://en.wikipedia.org/wiki/Miscellaneous_Symbols_and_Pictographs#Emoji_modifiers) 86 will render consistently. 87 88 The default marker for the view will be used if the Unicode emoji name can not be interpreted. 89 """ 90 91 is_replay: str | None 92 """ 93 This attribute indicates if this flow has been replayed in either direction. 94 95 - a value of `request` indicates that the request has been artifically replayed by mitmproxy to the server. 96 - a value of `response` indicates that the response to the client's request has been set by server replay. 97 """ 98 99 live: bool 100 """ 101 If `True`, the flow belongs to a currently active connection. 102 If `False`, the flow may have been already completed or loaded from disk. 103 """ 104 105 timestamp_created: float 106 """ 107 The Unix timestamp of when this flow was created. 108 109 In contrast to `timestamp_start`, this value will not change when a flow is replayed. 110 """ 111 112 def __init__( 113 self, 114 client_conn: connection.Client, 115 server_conn: connection.Server, 116 live: bool = False, 117 ) -> None: 118 self.id = str(uuid.uuid4()) 119 self.client_conn = client_conn 120 self.server_conn = server_conn 121 self.live = live 122 self.timestamp_created = time.time() 123 124 self.intercepted: bool = False 125 self._resume_event: asyncio.Event | None = None 126 self._backup: Flow | None = None 127 self.marked: str = "" 128 self.is_replay: str | None = None 129 self.metadata: dict[str, Any] = dict() 130 self.comment: str = "" 131 132 __types: dict[str, type[Flow]] = {} 133 134 type: ClassVar[ 135 str 136 ] # automatically derived from the class name in __init_subclass__ 137 """The flow type, for example `http`, `tcp`, or `dns`.""" 138 139 def __init_subclass__(cls, **kwargs): 140 cls.type = cls.__name__.removesuffix("Flow").lower() 141 Flow.__types[cls.type] = cls 142 143 def get_state(self) -> serializable.State: 144 state = { 145 "version": version.FLOW_FORMAT_VERSION, 146 "type": self.type, 147 "id": self.id, 148 "error": self.error.get_state() if self.error else None, 149 "client_conn": self.client_conn.get_state(), 150 "server_conn": self.server_conn.get_state(), 151 "intercepted": self.intercepted, 152 "is_replay": self.is_replay, 153 "marked": self.marked, 154 "metadata": copy.deepcopy(self.metadata), 155 "comment": self.comment, 156 "timestamp_created": self.timestamp_created, 157 } 158 state["backup"] = copy.deepcopy(self._backup) if self._backup != state else None 159 return state 160 161 def set_state(self, state: serializable.State) -> None: 162 assert state.pop("version") == version.FLOW_FORMAT_VERSION 163 assert state.pop("type") == self.type 164 self.id = state.pop("id") 165 if state["error"]: 166 if self.error: 167 self.error.set_state(state.pop("error")) 168 else: 169 self.error = Error.from_state(state.pop("error")) 170 else: 171 self.error = state.pop("error") 172 self.client_conn.set_state(state.pop("client_conn")) 173 self.server_conn.set_state(state.pop("server_conn")) 174 self.intercepted = state.pop("intercepted") 175 self.is_replay = state.pop("is_replay") 176 self.marked = state.pop("marked") 177 self.metadata = state.pop("metadata") 178 self.comment = state.pop("comment") 179 self.timestamp_created = state.pop("timestamp_created") 180 self._backup = state.pop("backup", None) 181 assert state == {} 182 183 @classmethod 184 def from_state(cls, state: serializable.State) -> Flow: 185 try: 186 flow_cls = Flow.__types[state["type"]] 187 except KeyError: 188 raise ValueError(f"Unknown flow type: {state['type']}") 189 client = connection.Client(peername=("", 0), sockname=("", 0)) 190 server = connection.Server(address=None) 191 f = flow_cls(client, server) 192 f.set_state(state) 193 return f 194 195 def copy(self): 196 """Make a copy of this flow.""" 197 f = super().copy() 198 f.live = False 199 return f 200 201 def modified(self): 202 """ 203 `True` if this file has been modified by a user, `False` otherwise. 204 """ 205 if self._backup: 206 return self._backup != self.get_state() 207 else: 208 return False 209 210 def backup(self, force=False): 211 """ 212 Save a backup of this flow, which can be restored by calling `Flow.revert()`. 213 """ 214 if not self._backup: 215 self._backup = self.get_state() 216 217 def revert(self): 218 """ 219 Revert to the last backed up state. 220 """ 221 if self._backup: 222 self.set_state(self._backup) 223 self._backup = None 224 225 @property 226 def killable(self): 227 """*Read-only:* `True` if this flow can be killed, `False` otherwise.""" 228 return self.live and not (self.error and self.error.msg == Error.KILLED_MESSAGE) 229 230 def kill(self): 231 """ 232 Kill this flow. The current request/response will not be forwarded to its destination. 233 """ 234 if not self.killable: 235 raise exceptions.ControlException("Flow is not killable.") 236 # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill 237 # flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised 238 # as killable. An alternative approach would be to introduce a `KillInjected` event similar to 239 # `MessageInjected`, which should fix this issue. 240 self.error = Error(Error.KILLED_MESSAGE) 241 self.intercepted = False 242 self.live = False 243 244 def intercept(self): 245 """ 246 Intercept this Flow. Processing will stop until resume is 247 called. 248 """ 249 if self.intercepted: 250 return 251 self.intercepted = True 252 if self._resume_event is not None: 253 self._resume_event.clear() 254 255 async def wait_for_resume(self): 256 """ 257 Wait until this Flow is resumed. 258 """ 259 if not self.intercepted: 260 return 261 if self._resume_event is None: 262 self._resume_event = asyncio.Event() 263 await self._resume_event.wait() 264 265 def resume(self): 266 """ 267 Continue with the flow – called after an intercept(). 268 """ 269 if not self.intercepted: 270 return 271 self.intercepted = False 272 if self._resume_event is not None: 273 self._resume_event.set() 274 275 @property 276 def timestamp_start(self) -> float: 277 """ 278 *Read-only:* Start time of the flow. 279 Depending on the flow type, this property is an alias for 280 `mitmproxy.connection.Client.timestamp_start` or `mitmproxy.http.Request.timestamp_start`. 281 """ 282 return self.client_conn.timestamp_start 283 284 285__all__ = [ 286 "Flow", 287 "Error", 288]
45class Flow(serializable.Serializable): 46 """ 47 Base class for network flows. A flow is a collection of objects, 48 for example HTTP request/response pairs or a list of TCP messages. 49 50 See also: 51 - mitmproxy.http.HTTPFlow 52 - mitmproxy.tcp.TCPFlow 53 - mitmproxy.udp.UDPFlow 54 """ 55 56 client_conn: connection.Client 57 """The client that connected to mitmproxy.""" 58 59 server_conn: connection.Server 60 """ 61 The server mitmproxy connected to. 62 63 Some flows may never cause mitmproxy to initiate a server connection, 64 for example because their response is replayed by mitmproxy itself. 65 To simplify implementation, those flows will still have a `server_conn` attribute 66 with a `timestamp_start` set to `None`. 67 """ 68 69 error: Error | None = None 70 """A connection or protocol error affecting this flow.""" 71 72 intercepted: bool 73 """ 74 If `True`, the flow is currently paused by mitmproxy. 75 We're waiting for a user action to forward the flow to its destination. 76 """ 77 78 marked: str = "" 79 """ 80 If this attribute is a non-empty string the flow has been marked by the user. 81 82 A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name. 83 84 For example `:grapes:` becomes `🍇` in views that support emoji rendering. 85 Consult the [Github API Emoji List](https://api.github.com/emojis) for a list of emoji that may be used. 86 Not all emoji, especially [emoji modifiers](https://en.wikipedia.org/wiki/Miscellaneous_Symbols_and_Pictographs#Emoji_modifiers) 87 will render consistently. 88 89 The default marker for the view will be used if the Unicode emoji name can not be interpreted. 90 """ 91 92 is_replay: str | None 93 """ 94 This attribute indicates if this flow has been replayed in either direction. 95 96 - a value of `request` indicates that the request has been artifically replayed by mitmproxy to the server. 97 - a value of `response` indicates that the response to the client's request has been set by server replay. 98 """ 99 100 live: bool 101 """ 102 If `True`, the flow belongs to a currently active connection. 103 If `False`, the flow may have been already completed or loaded from disk. 104 """ 105 106 timestamp_created: float 107 """ 108 The Unix timestamp of when this flow was created. 109 110 In contrast to `timestamp_start`, this value will not change when a flow is replayed. 111 """ 112 113 def __init__( 114 self, 115 client_conn: connection.Client, 116 server_conn: connection.Server, 117 live: bool = False, 118 ) -> None: 119 self.id = str(uuid.uuid4()) 120 self.client_conn = client_conn 121 self.server_conn = server_conn 122 self.live = live 123 self.timestamp_created = time.time() 124 125 self.intercepted: bool = False 126 self._resume_event: asyncio.Event | None = None 127 self._backup: Flow | None = None 128 self.marked: str = "" 129 self.is_replay: str | None = None 130 self.metadata: dict[str, Any] = dict() 131 self.comment: str = "" 132 133 __types: dict[str, type[Flow]] = {} 134 135 type: ClassVar[ 136 str 137 ] # automatically derived from the class name in __init_subclass__ 138 """The flow type, for example `http`, `tcp`, or `dns`.""" 139 140 def __init_subclass__(cls, **kwargs): 141 cls.type = cls.__name__.removesuffix("Flow").lower() 142 Flow.__types[cls.type] = cls 143 144 def get_state(self) -> serializable.State: 145 state = { 146 "version": version.FLOW_FORMAT_VERSION, 147 "type": self.type, 148 "id": self.id, 149 "error": self.error.get_state() if self.error else None, 150 "client_conn": self.client_conn.get_state(), 151 "server_conn": self.server_conn.get_state(), 152 "intercepted": self.intercepted, 153 "is_replay": self.is_replay, 154 "marked": self.marked, 155 "metadata": copy.deepcopy(self.metadata), 156 "comment": self.comment, 157 "timestamp_created": self.timestamp_created, 158 } 159 state["backup"] = copy.deepcopy(self._backup) if self._backup != state else None 160 return state 161 162 def set_state(self, state: serializable.State) -> None: 163 assert state.pop("version") == version.FLOW_FORMAT_VERSION 164 assert state.pop("type") == self.type 165 self.id = state.pop("id") 166 if state["error"]: 167 if self.error: 168 self.error.set_state(state.pop("error")) 169 else: 170 self.error = Error.from_state(state.pop("error")) 171 else: 172 self.error = state.pop("error") 173 self.client_conn.set_state(state.pop("client_conn")) 174 self.server_conn.set_state(state.pop("server_conn")) 175 self.intercepted = state.pop("intercepted") 176 self.is_replay = state.pop("is_replay") 177 self.marked = state.pop("marked") 178 self.metadata = state.pop("metadata") 179 self.comment = state.pop("comment") 180 self.timestamp_created = state.pop("timestamp_created") 181 self._backup = state.pop("backup", None) 182 assert state == {} 183 184 @classmethod 185 def from_state(cls, state: serializable.State) -> Flow: 186 try: 187 flow_cls = Flow.__types[state["type"]] 188 except KeyError: 189 raise ValueError(f"Unknown flow type: {state['type']}") 190 client = connection.Client(peername=("", 0), sockname=("", 0)) 191 server = connection.Server(address=None) 192 f = flow_cls(client, server) 193 f.set_state(state) 194 return f 195 196 def copy(self): 197 """Make a copy of this flow.""" 198 f = super().copy() 199 f.live = False 200 return f 201 202 def modified(self): 203 """ 204 `True` if this file has been modified by a user, `False` otherwise. 205 """ 206 if self._backup: 207 return self._backup != self.get_state() 208 else: 209 return False 210 211 def backup(self, force=False): 212 """ 213 Save a backup of this flow, which can be restored by calling `Flow.revert()`. 214 """ 215 if not self._backup: 216 self._backup = self.get_state() 217 218 def revert(self): 219 """ 220 Revert to the last backed up state. 221 """ 222 if self._backup: 223 self.set_state(self._backup) 224 self._backup = None 225 226 @property 227 def killable(self): 228 """*Read-only:* `True` if this flow can be killed, `False` otherwise.""" 229 return self.live and not (self.error and self.error.msg == Error.KILLED_MESSAGE) 230 231 def kill(self): 232 """ 233 Kill this flow. The current request/response will not be forwarded to its destination. 234 """ 235 if not self.killable: 236 raise exceptions.ControlException("Flow is not killable.") 237 # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill 238 # flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised 239 # as killable. An alternative approach would be to introduce a `KillInjected` event similar to 240 # `MessageInjected`, which should fix this issue. 241 self.error = Error(Error.KILLED_MESSAGE) 242 self.intercepted = False 243 self.live = False 244 245 def intercept(self): 246 """ 247 Intercept this Flow. Processing will stop until resume is 248 called. 249 """ 250 if self.intercepted: 251 return 252 self.intercepted = True 253 if self._resume_event is not None: 254 self._resume_event.clear() 255 256 async def wait_for_resume(self): 257 """ 258 Wait until this Flow is resumed. 259 """ 260 if not self.intercepted: 261 return 262 if self._resume_event is None: 263 self._resume_event = asyncio.Event() 264 await self._resume_event.wait() 265 266 def resume(self): 267 """ 268 Continue with the flow – called after an intercept(). 269 """ 270 if not self.intercepted: 271 return 272 self.intercepted = False 273 if self._resume_event is not None: 274 self._resume_event.set() 275 276 @property 277 def timestamp_start(self) -> float: 278 """ 279 *Read-only:* Start time of the flow. 280 Depending on the flow type, this property is an alias for 281 `mitmproxy.connection.Client.timestamp_start` or `mitmproxy.http.Request.timestamp_start`. 282 """ 283 return self.client_conn.timestamp_start
Base class for network flows. A flow is a collection of objects, for example HTTP request/response pairs or a list of TCP messages.
See also:
The server mitmproxy connected to.
Some flows may never cause mitmproxy to initiate a server connection,
for example because their response is replayed by mitmproxy itself.
To simplify implementation, those flows will still have a server_conn
attribute
with a timestamp_start
set to None
.
If True
, the flow is currently paused by mitmproxy.
We're waiting for a user action to forward the flow to its destination.
If this attribute is a non-empty string the flow has been marked by the user.
A string value will be used as the marker annotation. May either be a single character or a Unicode emoji name.
For example :grapes:
becomes 🍇
in views that support emoji rendering.
Consult the Github API Emoji List for a list of emoji that may be used.
Not all emoji, especially emoji modifiers
will render consistently.
The default marker for the view will be used if the Unicode emoji name can not be interpreted.
This attribute indicates if this flow has been replayed in either direction.
- a value of
request
indicates that the request has been artifically replayed by mitmproxy to the server. - a value of
response
indicates that the response to the client's request has been set by server replay.
If True
, the flow belongs to a currently active connection.
If False
, the flow may have been already completed or loaded from disk.
The Unix timestamp of when this flow was created.
In contrast to timestamp_start
, this value will not change when a flow is replayed.
196 def copy(self): 197 """Make a copy of this flow.""" 198 f = super().copy() 199 f.live = False 200 return f
Make a copy of this flow.
202 def modified(self): 203 """ 204 `True` if this file has been modified by a user, `False` otherwise. 205 """ 206 if self._backup: 207 return self._backup != self.get_state() 208 else: 209 return False
True
if this file has been modified by a user, False
otherwise.
211 def backup(self, force=False): 212 """ 213 Save a backup of this flow, which can be restored by calling `Flow.revert()`. 214 """ 215 if not self._backup: 216 self._backup = self.get_state()
Save a backup of this flow, which can be restored by calling Flow.revert()
.
218 def revert(self): 219 """ 220 Revert to the last backed up state. 221 """ 222 if self._backup: 223 self.set_state(self._backup) 224 self._backup = None
Revert to the last backed up state.
226 @property 227 def killable(self): 228 """*Read-only:* `True` if this flow can be killed, `False` otherwise.""" 229 return self.live and not (self.error and self.error.msg == Error.KILLED_MESSAGE)
Read-only: True
if this flow can be killed, False
otherwise.
231 def kill(self): 232 """ 233 Kill this flow. The current request/response will not be forwarded to its destination. 234 """ 235 if not self.killable: 236 raise exceptions.ControlException("Flow is not killable.") 237 # TODO: The way we currently signal killing is not ideal. One major problem is that we cannot kill 238 # flows in transit (https://github.com/mitmproxy/mitmproxy/issues/4711), even though they are advertised 239 # as killable. An alternative approach would be to introduce a `KillInjected` event similar to 240 # `MessageInjected`, which should fix this issue. 241 self.error = Error(Error.KILLED_MESSAGE) 242 self.intercepted = False 243 self.live = False
Kill this flow. The current request/response will not be forwarded to its destination.
245 def intercept(self): 246 """ 247 Intercept this Flow. Processing will stop until resume is 248 called. 249 """ 250 if self.intercepted: 251 return 252 self.intercepted = True 253 if self._resume_event is not None: 254 self._resume_event.clear()
Intercept this Flow. Processing will stop until resume is called.
256 async def wait_for_resume(self): 257 """ 258 Wait until this Flow is resumed. 259 """ 260 if not self.intercepted: 261 return 262 if self._resume_event is None: 263 self._resume_event = asyncio.Event() 264 await self._resume_event.wait()
Wait until this Flow is resumed.
266 def resume(self): 267 """ 268 Continue with the flow – called after an intercept(). 269 """ 270 if not self.intercepted: 271 return 272 self.intercepted = False 273 if self._resume_event is not None: 274 self._resume_event.set()
Continue with the flow – called after an intercept().
276 @property 277 def timestamp_start(self) -> float: 278 """ 279 *Read-only:* Start time of the flow. 280 Depending on the flow type, this property is an alias for 281 `mitmproxy.connection.Client.timestamp_start` or `mitmproxy.http.Request.timestamp_start`. 282 """ 283 return self.client_conn.timestamp_start
Read-only: Start time of the flow.
Depending on the flow type, this property is an alias for
mitmproxy.connection.Client.timestamp_start
or mitmproxy.http.Request.timestamp_start
.
19@dataclass 20class Error(serializable.SerializableDataclass): 21 """ 22 An Error. 23 24 This is distinct from an protocol error response (say, a HTTP code 500), 25 which is represented by a normal `mitmproxy.http.Response` object. This class is 26 responsible for indicating errors that fall outside of normal protocol 27 communications, like interrupted connections, timeouts, or protocol errors. 28 """ 29 30 msg: str 31 """Message describing the error.""" 32 33 timestamp: float = field(default_factory=time.time) 34 """Unix timestamp of when this error happened.""" 35 36 KILLED_MESSAGE: ClassVar[str] = "Connection killed." 37 38 def __str__(self): 39 return self.msg 40 41 def __repr__(self): 42 return self.msg
An Error.
This is distinct from an protocol error response (say, a HTTP code 500),
which is represented by a normal mitmproxy.http.Response
object. This class is
responsible for indicating errors that fall outside of normal protocol
communications, like interrupted connections, timeouts, or protocol errors.