Edit on GitHub

mitmproxy.dns

  1from __future__ import annotations
  2
  3import itertools
  4import random
  5import struct
  6import time
  7from dataclasses import dataclass
  8from ipaddress import IPv4Address
  9from ipaddress import IPv6Address
 10from typing import ClassVar
 11
 12from mitmproxy import flow
 13from mitmproxy.coretypes import serializable
 14from mitmproxy.net.dns import classes
 15from mitmproxy.net.dns import domain_names
 16from mitmproxy.net.dns import op_codes
 17from mitmproxy.net.dns import response_codes
 18from mitmproxy.net.dns import types
 19
 20# DNS parameters taken from https://www.iana.org/assignments/dns-parameters/dns-parameters.xml
 21
 22
 23@dataclass
 24class Question(serializable.SerializableDataclass):
 25    HEADER: ClassVar[struct.Struct] = struct.Struct("!HH")
 26
 27    name: str
 28    type: int
 29    class_: int
 30
 31    def __str__(self) -> str:
 32        return self.name
 33
 34    def to_json(self) -> dict:
 35        """
 36        Converts the question into json for mitmweb.
 37        Sync with web/src/flow.ts.
 38        """
 39        return {
 40            "name": self.name,
 41            "type": types.to_str(self.type),
 42            "class": classes.to_str(self.class_),
 43        }
 44
 45
 46@dataclass
 47class ResourceRecord(serializable.SerializableDataclass):
 48    DEFAULT_TTL: ClassVar[int] = 60
 49    HEADER: ClassVar[struct.Struct] = struct.Struct("!HHIH")
 50
 51    name: str
 52    type: int
 53    class_: int
 54    ttl: int
 55    data: bytes
 56
 57    def __str__(self) -> str:
 58        try:
 59            if self.type == types.A:
 60                return str(self.ipv4_address)
 61            if self.type == types.AAAA:
 62                return str(self.ipv6_address)
 63            if self.type in (types.NS, types.CNAME, types.PTR):
 64                return self.domain_name
 65            if self.type == types.TXT:
 66                return self.text
 67        except Exception:
 68            return f"0x{self.data.hex()} (invalid {types.to_str(self.type)} data)"
 69        return f"0x{self.data.hex()}"
 70
 71    @property
 72    def text(self) -> str:
 73        return self.data.decode("utf-8")
 74
 75    @text.setter
 76    def text(self, value: str) -> None:
 77        self.data = value.encode("utf-8")
 78
 79    @property
 80    def ipv4_address(self) -> IPv4Address:
 81        return IPv4Address(self.data)
 82
 83    @ipv4_address.setter
 84    def ipv4_address(self, ip: IPv4Address) -> None:
 85        self.data = ip.packed
 86
 87    @property
 88    def ipv6_address(self) -> IPv6Address:
 89        return IPv6Address(self.data)
 90
 91    @ipv6_address.setter
 92    def ipv6_address(self, ip: IPv6Address) -> None:
 93        self.data = ip.packed
 94
 95    @property
 96    def domain_name(self) -> str:
 97        return domain_names.unpack(self.data)
 98
 99    @domain_name.setter
100    def domain_name(self, name: str) -> None:
101        self.data = domain_names.pack(name)
102
103    def to_json(self) -> dict:
104        """
105        Converts the resource record into json for mitmweb.
106        Sync with web/src/flow.ts.
107        """
108        return {
109            "name": self.name,
110            "type": types.to_str(self.type),
111            "class": classes.to_str(self.class_),
112            "ttl": self.ttl,
113            "data": str(self),
114        }
115
116    @classmethod
117    def A(cls, name: str, ip: IPv4Address, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
118        """Create an IPv4 resource record."""
119        return cls(name, types.A, classes.IN, ttl, ip.packed)
120
121    @classmethod
122    def AAAA(
123        cls, name: str, ip: IPv6Address, *, ttl: int = DEFAULT_TTL
124    ) -> ResourceRecord:
125        """Create an IPv6 resource record."""
126        return cls(name, types.AAAA, classes.IN, ttl, ip.packed)
127
128    @classmethod
129    def CNAME(
130        cls, alias: str, canonical: str, *, ttl: int = DEFAULT_TTL
131    ) -> ResourceRecord:
132        """Create a canonical internet name resource record."""
133        return cls(alias, types.CNAME, classes.IN, ttl, domain_names.pack(canonical))
134
135    @classmethod
136    def PTR(cls, inaddr: str, ptr: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
137        """Create a canonical internet name resource record."""
138        return cls(inaddr, types.PTR, classes.IN, ttl, domain_names.pack(ptr))
139
140    @classmethod
141    def TXT(cls, name: str, text: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord:
142        """Create a textual resource record."""
143        return cls(name, types.TXT, classes.IN, ttl, text.encode("utf-8"))
144
145
146# comments are taken from rfc1035
147@dataclass
148class Message(serializable.SerializableDataclass):
149    HEADER: ClassVar[struct.Struct] = struct.Struct("!HHHHHH")
150
151    timestamp: float
152    """The time at which the message was sent or received."""
153    id: int
154    """An identifier assigned by the program that generates any kind of query."""
155    query: bool
156    """A field that specifies whether this message is a query."""
157    op_code: int
158    """
159    A field that specifies kind of query in this message.
160    This value is set by the originator of a request and copied into the response.
161    """
162    authoritative_answer: bool
163    """
164    This field is valid in responses, and specifies that the responding name server
165    is an authority for the domain name in question section.
166    """
167    truncation: bool
168    """Specifies that this message was truncated due to length greater than that permitted on the transmission channel."""
169    recursion_desired: bool
170    """
171    This field may be set in a query and is copied into the response.
172    If set, it directs the name server to pursue the query recursively.
173    """
174    recursion_available: bool
175    """This field is set or cleared in a response, and denotes whether recursive query support is available in the name server."""
176    reserved: int
177    """Reserved for future use.  Must be zero in all queries and responses."""
178    response_code: int
179    """This field is set as part of responses."""
180    questions: list[Question]
181    """
182    The question section is used to carry the "question" in most queries, i.e.
183    the parameters that define what is being asked.
184    """
185    answers: list[ResourceRecord]
186    """First resource record section."""
187    authorities: list[ResourceRecord]
188    """Second resource record section."""
189    additionals: list[ResourceRecord]
190    """Third resource record section."""
191
192    def __str__(self) -> str:
193        return "\r\n".join(
194            map(
195                str,
196                itertools.chain(
197                    self.questions, self.answers, self.authorities, self.additionals
198                ),
199            )
200        )
201
202    @property
203    def content(self) -> bytes:
204        """Returns the user-friendly content of all parts as encoded bytes."""
205        return str(self).encode()
206
207    @property
208    def size(self) -> int:
209        """Returns the cumulative data size of all resource record sections."""
210        return sum(
211            len(x.data)
212            for x in itertools.chain.from_iterable(
213                [self.answers, self.authorities, self.additionals]
214            )
215        )
216
217    def fail(self, response_code: int) -> Message:
218        if response_code == response_codes.NOERROR:
219            raise ValueError("response_code must be an error code.")
220        return Message(
221            timestamp=time.time(),
222            id=self.id,
223            query=False,
224            op_code=self.op_code,
225            authoritative_answer=False,
226            truncation=False,
227            recursion_desired=self.recursion_desired,
228            recursion_available=False,
229            reserved=0,
230            response_code=response_code,
231            questions=self.questions,
232            answers=[],
233            authorities=[],
234            additionals=[],
235        )
236
237    def succeed(self, answers: list[ResourceRecord]) -> Message:
238        return Message(
239            timestamp=time.time(),
240            id=self.id,
241            query=False,
242            op_code=self.op_code,
243            authoritative_answer=False,
244            truncation=False,
245            recursion_desired=self.recursion_desired,
246            recursion_available=True,
247            reserved=0,
248            response_code=response_codes.NOERROR,
249            questions=self.questions,
250            answers=answers,
251            authorities=[],
252            additionals=[],
253        )
254
255    @classmethod
256    def unpack(cls, buffer: bytes) -> Message:
257        """Converts the entire given buffer into a DNS message."""
258        length, msg = cls.unpack_from(buffer, 0)
259        if length != len(buffer):
260            raise struct.error(f"unpack requires a buffer of {length} bytes")
261        return msg
262
263    @classmethod
264    def unpack_from(cls, buffer: bytes | bytearray, offset: int) -> tuple[int, Message]:
265        """Converts the buffer from a given offset into a DNS message and also returns its length."""
266        (
267            id,
268            flags,
269            len_questions,
270            len_answers,
271            len_authorities,
272            len_additionals,
273        ) = Message.HEADER.unpack_from(buffer, offset)
274        msg = Message(
275            timestamp=time.time(),
276            id=id,
277            query=(flags & (1 << 15)) == 0,
278            op_code=(flags >> 11) & 0b1111,
279            authoritative_answer=(flags & (1 << 10)) != 0,
280            truncation=(flags & (1 << 9)) != 0,
281            recursion_desired=(flags & (1 << 8)) != 0,
282            recursion_available=(flags & (1 << 7)) != 0,
283            reserved=(flags >> 4) & 0b111,
284            response_code=flags & 0b1111,
285            questions=[],
286            answers=[],
287            authorities=[],
288            additionals=[],
289        )
290        offset += Message.HEADER.size
291        cached_names = domain_names.cache()
292
293        def unpack_domain_name() -> str:
294            nonlocal buffer, offset, cached_names
295            name, length = domain_names.unpack_from_with_compression(
296                buffer, offset, cached_names
297            )
298            offset += length
299            return name
300
301        for i in range(0, len_questions):
302            try:
303                name = unpack_domain_name()
304                type, class_ = Question.HEADER.unpack_from(buffer, offset)
305                offset += Question.HEADER.size
306                msg.questions.append(Question(name=name, type=type, class_=class_))
307            except struct.error as e:
308                raise struct.error(f"question #{i}: {str(e)}")
309
310        def unpack_rrs(
311            section: list[ResourceRecord], section_name: str, count: int
312        ) -> None:
313            nonlocal buffer, offset
314            for i in range(0, count):
315                try:
316                    name = unpack_domain_name()
317                    type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from(
318                        buffer, offset
319                    )
320                    offset += ResourceRecord.HEADER.size
321                    end_data = offset + len_data
322                    if len(buffer) < end_data:
323                        raise struct.error(
324                            f"unpack requires a data buffer of {len_data} bytes"
325                        )
326                    data = buffer[offset:end_data]
327                    if 0b11000000 in data:
328                        # the resource record might contains a compressed domain name, if so, uncompressed in advance
329                        try:
330                            (
331                                rr_name,
332                                rr_name_len,
333                            ) = domain_names.unpack_from_with_compression(
334                                buffer, offset, cached_names
335                            )
336                            if rr_name_len == len_data:
337                                data = domain_names.pack(rr_name)
338                        except struct.error:
339                            pass
340                    section.append(ResourceRecord(name, type, class_, ttl, data))
341                    offset += len_data
342                except struct.error as e:
343                    raise struct.error(f"{section_name} #{i}: {str(e)}")
344
345        unpack_rrs(msg.answers, "answer", len_answers)
346        unpack_rrs(msg.authorities, "authority", len_authorities)
347        unpack_rrs(msg.additionals, "additional", len_additionals)
348        return (offset, msg)
349
350    @property
351    def packed(self) -> bytes:
352        """Converts the message into network bytes."""
353        if self.id < 0 or self.id > 65535:
354            raise ValueError(f"DNS message's id {self.id} is out of bounds.")
355        flags = 0
356        if not self.query:
357            flags |= 1 << 15
358        if self.op_code < 0 or self.op_code > 0b1111:
359            raise ValueError(f"DNS message's op_code {self.op_code} is out of bounds.")
360        flags |= self.op_code << 11
361        if self.authoritative_answer:
362            flags |= 1 << 10
363        if self.truncation:
364            flags |= 1 << 9
365        if self.recursion_desired:
366            flags |= 1 << 8
367        if self.recursion_available:
368            flags |= 1 << 7
369        if self.reserved < 0 or self.reserved > 0b111:
370            raise ValueError(
371                f"DNS message's reserved value of {self.reserved} is out of bounds."
372            )
373        flags |= self.reserved << 4
374        if self.response_code < 0 or self.response_code > 0b1111:
375            raise ValueError(
376                f"DNS message's response_code {self.response_code} is out of bounds."
377            )
378        flags |= self.response_code
379        data = bytearray()
380        data.extend(
381            Message.HEADER.pack(
382                self.id,
383                flags,
384                len(self.questions),
385                len(self.answers),
386                len(self.authorities),
387                len(self.additionals),
388            )
389        )
390        # TODO implement compression
391        for question in self.questions:
392            data.extend(domain_names.pack(question.name))
393            data.extend(Question.HEADER.pack(question.type, question.class_))
394        for rr in (*self.answers, *self.authorities, *self.additionals):
395            data.extend(domain_names.pack(rr.name))
396            data.extend(
397                ResourceRecord.HEADER.pack(rr.type, rr.class_, rr.ttl, len(rr.data))
398            )
399            data.extend(rr.data)
400        return bytes(data)
401
402    def to_json(self) -> dict:
403        """
404        Converts the message into json for mitmweb.
405        Sync with web/src/flow.ts.
406        """
407        return {
408            "id": self.id,
409            "query": self.query,
410            "op_code": op_codes.to_str(self.op_code),
411            "authoritative_answer": self.authoritative_answer,
412            "truncation": self.truncation,
413            "recursion_desired": self.recursion_desired,
414            "recursion_available": self.recursion_available,
415            "response_code": response_codes.to_str(self.response_code),
416            "status_code": response_codes.http_equiv_status_code(self.response_code),
417            "questions": [question.to_json() for question in self.questions],
418            "answers": [rr.to_json() for rr in self.answers],
419            "authorities": [rr.to_json() for rr in self.authorities],
420            "additionals": [rr.to_json() for rr in self.additionals],
421            "size": self.size,
422            "timestamp": self.timestamp,
423        }
424
425    def copy(self) -> Message:
426        # we keep the copy semantics but change the ID generation
427        state = self.get_state()
428        state["id"] = random.randint(0, 65535)
429        return Message.from_state(state)
430
431
432class DNSFlow(flow.Flow):
433    """A DNSFlow is a collection of DNS messages representing a single DNS query."""
434
435    request: Message
436    """The DNS request."""
437    response: Message | None = None
438    """The DNS response."""
439
440    def get_state(self) -> serializable.State:
441        return {
442            **super().get_state(),
443            "request": self.request.get_state(),
444            "response": self.response.get_state() if self.response else None,
445        }
446
447    def set_state(self, state: serializable.State) -> None:
448        self.request = Message.from_state(state.pop("request"))
449        self.response = Message.from_state(r) if (r := state.pop("response")) else None
450        super().set_state(state)
451
452    def __repr__(self) -> str:
453        return f"<DNSFlow\r\n  request={repr(self.request)}\r\n  response={repr(self.response)}\r\n>"
class DNSFlow(mitmproxy.flow.Flow):
433class DNSFlow(flow.Flow):
434    """A DNSFlow is a collection of DNS messages representing a single DNS query."""
435
436    request: Message
437    """The DNS request."""
438    response: Message | None = None
439    """The DNS response."""
440
441    def get_state(self) -> serializable.State:
442        return {
443            **super().get_state(),
444            "request": self.request.get_state(),
445            "response": self.response.get_state() if self.response else None,
446        }
447
448    def set_state(self, state: serializable.State) -> None:
449        self.request = Message.from_state(state.pop("request"))
450        self.response = Message.from_state(r) if (r := state.pop("response")) else None
451        super().set_state(state)
452
453    def __repr__(self) -> str:
454        return f"<DNSFlow\r\n  request={repr(self.request)}\r\n  response={repr(self.response)}\r\n>"

A DNSFlow is a collection of DNS messages representing a single DNS query.

request: mitmproxy.dns.Message

The DNS request.

response: mitmproxy.dns.Message | None = None

The DNS response.

type: ClassVar[str] = 'dns'

The flow type, for example http, tcp, or dns.