Edit on GitHub

mitmproxy.addonmanager

  1import contextlib
  2import inspect
  3import pprint
  4import sys
  5import traceback
  6import types
  7from collections.abc import Callable, Sequence
  8from dataclasses import dataclass
  9from typing import Any, Optional
 10
 11from mitmproxy import hooks
 12from mitmproxy import exceptions
 13from mitmproxy import flow
 14from . import ctx
 15
 16
 17def _get_name(itm):
 18    return getattr(itm, "name", itm.__class__.__name__.lower())
 19
 20
 21def cut_traceback(tb, func_name):
 22    """
 23    Cut off a traceback at the function with the given name.
 24    The func_name's frame is excluded.
 25
 26    Args:
 27        tb: traceback object, as returned by sys.exc_info()[2]
 28        func_name: function name
 29
 30    Returns:
 31        Reduced traceback.
 32    """
 33    tb_orig = tb
 34    for _, _, fname, _ in traceback.extract_tb(tb):
 35        tb = tb.tb_next
 36        if fname == func_name:
 37            break
 38    return tb or tb_orig
 39
 40
 41@contextlib.contextmanager
 42def safecall():
 43    try:
 44        yield
 45    except (exceptions.AddonHalt, exceptions.OptionsError):
 46        raise
 47    except Exception:
 48        etype, value, tb = sys.exc_info()
 49        tb = cut_traceback(tb, "invoke_addon_sync")
 50        tb = cut_traceback(tb, "invoke_addon")
 51        ctx.log.error(
 52            "Addon error: %s" % "".join(traceback.format_exception(etype, value, tb))
 53        )
 54
 55
 56class Loader:
 57    """
 58    A loader object is passed to the load() event when addons start up.
 59    """
 60
 61    def __init__(self, master):
 62        self.master = master
 63
 64    def add_option(
 65        self,
 66        name: str,
 67        typespec: type,
 68        default: Any,
 69        help: str,
 70        choices: Optional[Sequence[str]] = None,
 71    ) -> None:
 72        """
 73        Add an option to mitmproxy.
 74
 75        Help should be a single paragraph with no linebreaks - it will be
 76        reflowed by tools. Information on the data type should be omitted -
 77        it will be generated and added by tools as needed.
 78        """
 79        if name in self.master.options:
 80            existing = self.master.options._options[name]
 81            same_signature = (
 82                existing.name == name
 83                and existing.typespec == typespec
 84                and existing.default == default
 85                and existing.help == help
 86                and existing.choices == choices
 87            )
 88            if same_signature:
 89                return
 90            else:
 91                ctx.log.warn("Over-riding existing option %s" % name)
 92        self.master.options.add_option(name, typespec, default, help, choices)
 93
 94    def add_command(self, path: str, func: Callable) -> None:
 95        """Add a command to mitmproxy.
 96
 97        Unless you are generating commands programatically,
 98        this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead.
 99        """
100        self.master.commands.add(path, func)
101
102
103def traverse(chain):
104    """
105    Recursively traverse an addon chain.
106    """
107    for a in chain:
108        yield a
109        if hasattr(a, "addons"):
110            yield from traverse(a.addons)
111
112
113@dataclass
114class LoadHook(hooks.Hook):
115    """
116    Called when an addon is first loaded. This event receives a Loader
117    object, which contains methods for adding options and commands. This
118    method is where the addon configures itself.
119    """
120
121    loader: Loader
122
123
124class AddonManager:
125    def __init__(self, master):
126        self.lookup = {}
127        self.chain = []
128        self.master = master
129        master.options.changed.connect(self._configure_all)
130
131    def _configure_all(self, options, updated):
132        self.trigger(hooks.ConfigureHook(updated))
133
134    def clear(self):
135        """
136        Remove all addons.
137        """
138        for a in self.chain:
139            self.invoke_addon_sync(a, hooks.DoneHook())
140        self.lookup = {}
141        self.chain = []
142
143    def get(self, name):
144        """
145        Retrieve an addon by name. Addon names are equal to the .name
146        attribute on the instance, or the lower case class name if that
147        does not exist.
148        """
149        return self.lookup.get(name, None)
150
151    def register(self, addon):
152        """
153        Register an addon, call its load event, and then register all its
154        sub-addons. This should be used by addons that dynamically manage
155        addons.
156
157        If the calling addon is already running, it should follow with
158        running and configure events. Must be called within a current
159        context.
160        """
161        api_changes = {
162            # mitmproxy 6 -> mitmproxy 7
163            "clientconnect": "client_connected",
164            "clientdisconnect": "client_disconnected",
165            "serverconnect": "server_connect and server_connected",
166            "serverdisconnect": "server_disconnected",
167        }
168        for a in traverse([addon]):
169            for old, new in api_changes.items():
170                if hasattr(a, old):
171                    ctx.log.warn(
172                        f"The {old} event has been removed, use {new} instead. "
173                        f"For more details, see https://docs.mitmproxy.org/stable/addons-events/."
174                    )
175            name = _get_name(a)
176            if name in self.lookup:
177                raise exceptions.AddonManagerError(
178                    "An addon called '%s' already exists." % name
179                )
180        l = Loader(self.master)
181        self.invoke_addon_sync(addon, LoadHook(l))
182        for a in traverse([addon]):
183            name = _get_name(a)
184            self.lookup[name] = a
185        for a in traverse([addon]):
186            self.master.commands.collect_commands(a)
187        self.master.options.process_deferred()
188        return addon
189
190    def add(self, *addons):
191        """
192        Add addons to the end of the chain, and run their load event.
193        If any addon has sub-addons, they are registered.
194        """
195        for i in addons:
196            self.chain.append(self.register(i))
197
198    def remove(self, addon):
199        """
200        Remove an addon and all its sub-addons.
201
202        If the addon is not in the chain - that is, if it's managed by a
203        parent addon - it's the parent's responsibility to remove it from
204        its own addons attribute.
205        """
206        for a in traverse([addon]):
207            n = _get_name(a)
208            if n not in self.lookup:
209                raise exceptions.AddonManagerError("No such addon: %s" % n)
210            self.chain = [i for i in self.chain if i is not a]
211            del self.lookup[_get_name(a)]
212        self.invoke_addon_sync(addon, hooks.DoneHook())
213
214    def __len__(self):
215        return len(self.chain)
216
217    def __str__(self):
218        return pprint.pformat([str(i) for i in self.chain])
219
220    def __contains__(self, item):
221        name = _get_name(item)
222        return name in self.lookup
223
224    async def handle_lifecycle(self, event: hooks.Hook):
225        """
226        Handle a lifecycle event.
227        """
228        message = event.args()[0]
229
230        await self.trigger_event(event)
231
232        if isinstance(message, flow.Flow):
233            await self.trigger_event(hooks.UpdateHook([message]))
234
235    def _iter_hooks(self, addon, event: hooks.Hook):
236        """
237        Enumerate all hook callables belonging to the given addon
238        """
239        assert isinstance(event, hooks.Hook)
240        for a in traverse([addon]):
241            func = getattr(a, event.name, None)
242            if func:
243                if callable(func):
244                    yield a, func
245                elif isinstance(func, types.ModuleType):
246                    # we gracefully exclude module imports with the same name as hooks.
247                    # For example, a user may have "from mitmproxy import log" in an addon,
248                    # which has the same name as the "log" hook. In this particular case,
249                    # we end up in an error loop because we "log" this error.
250                    pass
251                else:
252                    raise exceptions.AddonManagerError(
253                        f"Addon handler {event.name} ({a}) not callable"
254                    )
255
256    async def invoke_addon(self, addon, event: hooks.Hook):
257        """
258        Asynchronously invoke an event on an addon and all its children.
259        """
260        for addon, func in self._iter_hooks(addon, event):
261            res = func(*event.args())
262            # Support both async and sync hook functions
263            if res is not None and inspect.isawaitable(res):
264                await res
265
266    def invoke_addon_sync(self, addon, event: hooks.Hook):
267        """
268        Invoke an event on an addon and all its children.
269        """
270        for addon, func in self._iter_hooks(addon, event):
271            if inspect.iscoroutinefunction(func):
272                raise exceptions.AddonManagerError(
273                    f"Async handler {event.name} ({addon}) cannot be called from sync context"
274                )
275            func(*event.args())
276
277    async def trigger_event(self, event: hooks.Hook):
278        """
279        Asynchronously trigger an event across all addons.
280        """
281        for i in self.chain:
282            try:
283                with safecall():
284                    await self.invoke_addon(i, event)
285            except exceptions.AddonHalt:
286                return
287
288    def trigger(self, event: hooks.Hook):
289        """
290        Trigger an event across all addons.
291
292        This API is discouraged and may be deprecated in the future.
293        Use `trigger_event()` instead, which provides the same functionality but supports async hooks.
294        """
295        for i in self.chain:
296            try:
297                with safecall():
298                    self.invoke_addon_sync(i, event)
299            except exceptions.AddonHalt:
300                return
class Loader:
 57class Loader:
 58    """
 59    A loader object is passed to the load() event when addons start up.
 60    """
 61
 62    def __init__(self, master):
 63        self.master = master
 64
 65    def add_option(
 66        self,
 67        name: str,
 68        typespec: type,
 69        default: Any,
 70        help: str,
 71        choices: Optional[Sequence[str]] = None,
 72    ) -> None:
 73        """
 74        Add an option to mitmproxy.
 75
 76        Help should be a single paragraph with no linebreaks - it will be
 77        reflowed by tools. Information on the data type should be omitted -
 78        it will be generated and added by tools as needed.
 79        """
 80        if name in self.master.options:
 81            existing = self.master.options._options[name]
 82            same_signature = (
 83                existing.name == name
 84                and existing.typespec == typespec
 85                and existing.default == default
 86                and existing.help == help
 87                and existing.choices == choices
 88            )
 89            if same_signature:
 90                return
 91            else:
 92                ctx.log.warn("Over-riding existing option %s" % name)
 93        self.master.options.add_option(name, typespec, default, help, choices)
 94
 95    def add_command(self, path: str, func: Callable) -> None:
 96        """Add a command to mitmproxy.
 97
 98        Unless you are generating commands programatically,
 99        this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead.
100        """
101        self.master.commands.add(path, func)

A loader object is passed to the load() event when addons start up.

def add_option( self, name: str, typespec: type, default: Any, help: str, choices: Optional[collections.abc.Sequence[str]] = None) -> None:
65    def add_option(
66        self,
67        name: str,
68        typespec: type,
69        default: Any,
70        help: str,
71        choices: Optional[Sequence[str]] = None,
72    ) -> None:
73        """
74        Add an option to mitmproxy.
75
76        Help should be a single paragraph with no linebreaks - it will be
77        reflowed by tools. Information on the data type should be omitted -
78        it will be generated and added by tools as needed.
79        """
80        if name in self.master.options:
81            existing = self.master.options._options[name]
82            same_signature = (
83                existing.name == name
84                and existing.typespec == typespec
85                and existing.default == default
86                and existing.help == help
87                and existing.choices == choices
88            )
89            if same_signature:
90                return
91            else:
92                ctx.log.warn("Over-riding existing option %s" % name)
93        self.master.options.add_option(name, typespec, default, help, choices)

Add an option to mitmproxy.

Help should be a single paragraph with no linebreaks - it will be reflowed by tools. Information on the data type should be omitted - it will be generated and added by tools as needed.

def add_command(self, path: str, func: collections.abc.Callable) -> None:
 95    def add_command(self, path: str, func: Callable) -> None:
 96        """Add a command to mitmproxy.
 97
 98        Unless you are generating commands programatically,
 99        this API should be avoided. Decorate your function with `@mitmproxy.command.command` instead.
100        """
101        self.master.commands.add(path, func)

Add a command to mitmproxy.

Unless you are generating commands programatically, this API should be avoided. Decorate your function with @mitmproxy.command.command instead.