mitmproxy.dns
1from __future__ import annotations 2 3import itertools 4import random 5import struct 6import time 7from dataclasses import dataclass 8from ipaddress import IPv4Address 9from ipaddress import IPv6Address 10from typing import ClassVar 11 12from mitmproxy import flow 13from mitmproxy.coretypes import serializable 14from mitmproxy.net.dns import classes 15from mitmproxy.net.dns import domain_names 16from mitmproxy.net.dns import op_codes 17from mitmproxy.net.dns import response_codes 18from mitmproxy.net.dns import types 19 20# DNS parameters taken from https://www.iana.org/assignments/dns-parameters/dns-parameters.xml 21 22 23@dataclass 24class Question(serializable.SerializableDataclass): 25 HEADER: ClassVar[struct.Struct] = struct.Struct("!HH") 26 27 name: str 28 type: int 29 class_: int 30 31 def __str__(self) -> str: 32 return self.name 33 34 def to_json(self) -> dict: 35 """ 36 Converts the question into json for mitmweb. 37 Sync with web/src/flow.ts. 38 """ 39 return { 40 "name": self.name, 41 "type": types.to_str(self.type), 42 "class": classes.to_str(self.class_), 43 } 44 45 46@dataclass 47class ResourceRecord(serializable.SerializableDataclass): 48 DEFAULT_TTL: ClassVar[int] = 60 49 HEADER: ClassVar[struct.Struct] = struct.Struct("!HHIH") 50 51 name: str 52 type: int 53 class_: int 54 ttl: int 55 data: bytes 56 57 def __str__(self) -> str: 58 try: 59 if self.type == types.A: 60 return str(self.ipv4_address) 61 if self.type == types.AAAA: 62 return str(self.ipv6_address) 63 if self.type in (types.NS, types.CNAME, types.PTR): 64 return self.domain_name 65 if self.type == types.TXT: 66 return self.text 67 except Exception: 68 return f"0x{self.data.hex()} (invalid {types.to_str(self.type)} data)" 69 return f"0x{self.data.hex()}" 70 71 @property 72 def text(self) -> str: 73 return self.data.decode("utf-8") 74 75 @text.setter 76 def text(self, value: str) -> None: 77 self.data = value.encode("utf-8") 78 79 @property 80 def ipv4_address(self) -> IPv4Address: 81 return IPv4Address(self.data) 82 83 @ipv4_address.setter 84 def ipv4_address(self, ip: IPv4Address) -> None: 85 self.data = ip.packed 86 87 @property 88 def ipv6_address(self) -> IPv6Address: 89 return IPv6Address(self.data) 90 91 @ipv6_address.setter 92 def ipv6_address(self, ip: IPv6Address) -> None: 93 self.data = ip.packed 94 95 @property 96 def domain_name(self) -> str: 97 return domain_names.unpack(self.data) 98 99 @domain_name.setter 100 def domain_name(self, name: str) -> None: 101 self.data = domain_names.pack(name) 102 103 def to_json(self) -> dict: 104 """ 105 Converts the resource record into json for mitmweb. 106 Sync with web/src/flow.ts. 107 """ 108 return { 109 "name": self.name, 110 "type": types.to_str(self.type), 111 "class": classes.to_str(self.class_), 112 "ttl": self.ttl, 113 "data": str(self), 114 } 115 116 @classmethod 117 def A(cls, name: str, ip: IPv4Address, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 118 """Create an IPv4 resource record.""" 119 return cls(name, types.A, classes.IN, ttl, ip.packed) 120 121 @classmethod 122 def AAAA( 123 cls, name: str, ip: IPv6Address, *, ttl: int = DEFAULT_TTL 124 ) -> ResourceRecord: 125 """Create an IPv6 resource record.""" 126 return cls(name, types.AAAA, classes.IN, ttl, ip.packed) 127 128 @classmethod 129 def CNAME( 130 cls, alias: str, canonical: str, *, ttl: int = DEFAULT_TTL 131 ) -> ResourceRecord: 132 """Create a canonical internet name resource record.""" 133 return cls(alias, types.CNAME, classes.IN, ttl, domain_names.pack(canonical)) 134 135 @classmethod 136 def PTR(cls, inaddr: str, ptr: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 137 """Create a canonical internet name resource record.""" 138 return cls(inaddr, types.PTR, classes.IN, ttl, domain_names.pack(ptr)) 139 140 @classmethod 141 def TXT(cls, name: str, text: str, *, ttl: int = DEFAULT_TTL) -> ResourceRecord: 142 """Create a textual resource record.""" 143 return cls(name, types.TXT, classes.IN, ttl, text.encode("utf-8")) 144 145 146# comments are taken from rfc1035 147@dataclass 148class Message(serializable.SerializableDataclass): 149 HEADER: ClassVar[struct.Struct] = struct.Struct("!HHHHHH") 150 151 timestamp: float 152 """The time at which the message was sent or received.""" 153 id: int 154 """An identifier assigned by the program that generates any kind of query.""" 155 query: bool 156 """A field that specifies whether this message is a query.""" 157 op_code: int 158 """ 159 A field that specifies kind of query in this message. 160 This value is set by the originator of a request and copied into the response. 161 """ 162 authoritative_answer: bool 163 """ 164 This field is valid in responses, and specifies that the responding name server 165 is an authority for the domain name in question section. 166 """ 167 truncation: bool 168 """Specifies that this message was truncated due to length greater than that permitted on the transmission channel.""" 169 recursion_desired: bool 170 """ 171 This field may be set in a query and is copied into the response. 172 If set, it directs the name server to pursue the query recursively. 173 """ 174 recursion_available: bool 175 """This field is set or cleared in a response, and denotes whether recursive query support is available in the name server.""" 176 reserved: int 177 """Reserved for future use. Must be zero in all queries and responses.""" 178 response_code: int 179 """This field is set as part of responses.""" 180 questions: list[Question] 181 """ 182 The question section is used to carry the "question" in most queries, i.e. 183 the parameters that define what is being asked. 184 """ 185 answers: list[ResourceRecord] 186 """First resource record section.""" 187 authorities: list[ResourceRecord] 188 """Second resource record section.""" 189 additionals: list[ResourceRecord] 190 """Third resource record section.""" 191 192 def __str__(self) -> str: 193 return "\r\n".join( 194 map( 195 str, 196 itertools.chain( 197 self.questions, self.answers, self.authorities, self.additionals 198 ), 199 ) 200 ) 201 202 @property 203 def content(self) -> bytes: 204 """Returns the user-friendly content of all parts as encoded bytes.""" 205 return str(self).encode() 206 207 @property 208 def size(self) -> int: 209 """Returns the cumulative data size of all resource record sections.""" 210 return sum( 211 len(x.data) 212 for x in itertools.chain.from_iterable( 213 [self.answers, self.authorities, self.additionals] 214 ) 215 ) 216 217 def fail(self, response_code: int) -> Message: 218 if response_code == response_codes.NOERROR: 219 raise ValueError("response_code must be an error code.") 220 return Message( 221 timestamp=time.time(), 222 id=self.id, 223 query=False, 224 op_code=self.op_code, 225 authoritative_answer=False, 226 truncation=False, 227 recursion_desired=self.recursion_desired, 228 recursion_available=False, 229 reserved=0, 230 response_code=response_code, 231 questions=self.questions, 232 answers=[], 233 authorities=[], 234 additionals=[], 235 ) 236 237 def succeed(self, answers: list[ResourceRecord]) -> Message: 238 return Message( 239 timestamp=time.time(), 240 id=self.id, 241 query=False, 242 op_code=self.op_code, 243 authoritative_answer=False, 244 truncation=False, 245 recursion_desired=self.recursion_desired, 246 recursion_available=True, 247 reserved=0, 248 response_code=response_codes.NOERROR, 249 questions=self.questions, 250 answers=answers, 251 authorities=[], 252 additionals=[], 253 ) 254 255 @classmethod 256 def unpack(cls, buffer: bytes) -> Message: 257 """Converts the entire given buffer into a DNS message.""" 258 length, msg = cls.unpack_from(buffer, 0) 259 if length != len(buffer): 260 raise struct.error(f"unpack requires a buffer of {length} bytes") 261 return msg 262 263 @classmethod 264 def unpack_from(cls, buffer: bytes | bytearray, offset: int) -> tuple[int, Message]: 265 """Converts the buffer from a given offset into a DNS message and also returns its length.""" 266 ( 267 id, 268 flags, 269 len_questions, 270 len_answers, 271 len_authorities, 272 len_additionals, 273 ) = Message.HEADER.unpack_from(buffer, offset) 274 msg = Message( 275 timestamp=time.time(), 276 id=id, 277 query=(flags & (1 << 15)) == 0, 278 op_code=(flags >> 11) & 0b1111, 279 authoritative_answer=(flags & (1 << 10)) != 0, 280 truncation=(flags & (1 << 9)) != 0, 281 recursion_desired=(flags & (1 << 8)) != 0, 282 recursion_available=(flags & (1 << 7)) != 0, 283 reserved=(flags >> 4) & 0b111, 284 response_code=flags & 0b1111, 285 questions=[], 286 answers=[], 287 authorities=[], 288 additionals=[], 289 ) 290 offset += Message.HEADER.size 291 cached_names = domain_names.cache() 292 293 def unpack_domain_name() -> str: 294 nonlocal buffer, offset, cached_names 295 name, length = domain_names.unpack_from_with_compression( 296 buffer, offset, cached_names 297 ) 298 offset += length 299 return name 300 301 for i in range(0, len_questions): 302 try: 303 name = unpack_domain_name() 304 type, class_ = Question.HEADER.unpack_from(buffer, offset) 305 offset += Question.HEADER.size 306 msg.questions.append(Question(name=name, type=type, class_=class_)) 307 except struct.error as e: 308 raise struct.error(f"question #{i}: {str(e)}") 309 310 def unpack_rrs( 311 section: list[ResourceRecord], section_name: str, count: int 312 ) -> None: 313 nonlocal buffer, offset 314 for i in range(0, count): 315 try: 316 name = unpack_domain_name() 317 type, class_, ttl, len_data = ResourceRecord.HEADER.unpack_from( 318 buffer, offset 319 ) 320 offset += ResourceRecord.HEADER.size 321 end_data = offset + len_data 322 if len(buffer) < end_data: 323 raise struct.error( 324 f"unpack requires a data buffer of {len_data} bytes" 325 ) 326 data = buffer[offset:end_data] 327 if 0b11000000 in data: 328 # the resource record might contains a compressed domain name, if so, uncompressed in advance 329 try: 330 ( 331 rr_name, 332 rr_name_len, 333 ) = domain_names.unpack_from_with_compression( 334 buffer, offset, cached_names 335 ) 336 if rr_name_len == len_data: 337 data = domain_names.pack(rr_name) 338 except struct.error: 339 pass 340 section.append(ResourceRecord(name, type, class_, ttl, data)) 341 offset += len_data 342 except struct.error as e: 343 raise struct.error(f"{section_name} #{i}: {str(e)}") 344 345 unpack_rrs(msg.answers, "answer", len_answers) 346 unpack_rrs(msg.authorities, "authority", len_authorities) 347 unpack_rrs(msg.additionals, "additional", len_additionals) 348 return (offset, msg) 349 350 @property 351 def packed(self) -> bytes: 352 """Converts the message into network bytes.""" 353 if self.id < 0 or self.id > 65535: 354 raise ValueError(f"DNS message's id {self.id} is out of bounds.") 355 flags = 0 356 if not self.query: 357 flags |= 1 << 15 358 if self.op_code < 0 or self.op_code > 0b1111: 359 raise ValueError(f"DNS message's op_code {self.op_code} is out of bounds.") 360 flags |= self.op_code << 11 361 if self.authoritative_answer: 362 flags |= 1 << 10 363 if self.truncation: 364 flags |= 1 << 9 365 if self.recursion_desired: 366 flags |= 1 << 8 367 if self.recursion_available: 368 flags |= 1 << 7 369 if self.reserved < 0 or self.reserved > 0b111: 370 raise ValueError( 371 f"DNS message's reserved value of {self.reserved} is out of bounds." 372 ) 373 flags |= self.reserved << 4 374 if self.response_code < 0 or self.response_code > 0b1111: 375 raise ValueError( 376 f"DNS message's response_code {self.response_code} is out of bounds." 377 ) 378 flags |= self.response_code 379 data = bytearray() 380 data.extend( 381 Message.HEADER.pack( 382 self.id, 383 flags, 384 len(self.questions), 385 len(self.answers), 386 len(self.authorities), 387 len(self.additionals), 388 ) 389 ) 390 # TODO implement compression 391 for question in self.questions: 392 data.extend(domain_names.pack(question.name)) 393 data.extend(Question.HEADER.pack(question.type, question.class_)) 394 for rr in (*self.answers, *self.authorities, *self.additionals): 395 data.extend(domain_names.pack(rr.name)) 396 data.extend( 397 ResourceRecord.HEADER.pack(rr.type, rr.class_, rr.ttl, len(rr.data)) 398 ) 399 data.extend(rr.data) 400 return bytes(data) 401 402 def to_json(self) -> dict: 403 """ 404 Converts the message into json for mitmweb. 405 Sync with web/src/flow.ts. 406 """ 407 return { 408 "id": self.id, 409 "query": self.query, 410 "op_code": op_codes.to_str(self.op_code), 411 "authoritative_answer": self.authoritative_answer, 412 "truncation": self.truncation, 413 "recursion_desired": self.recursion_desired, 414 "recursion_available": self.recursion_available, 415 "response_code": response_codes.to_str(self.response_code), 416 "status_code": response_codes.http_equiv_status_code(self.response_code), 417 "questions": [question.to_json() for question in self.questions], 418 "answers": [rr.to_json() for rr in self.answers], 419 "authorities": [rr.to_json() for rr in self.authorities], 420 "additionals": [rr.to_json() for rr in self.additionals], 421 "size": self.size, 422 "timestamp": self.timestamp, 423 } 424 425 def copy(self) -> Message: 426 # we keep the copy semantics but change the ID generation 427 state = self.get_state() 428 state["id"] = random.randint(0, 65535) 429 return Message.from_state(state) 430 431 432class DNSFlow(flow.Flow): 433 """A DNSFlow is a collection of DNS messages representing a single DNS query.""" 434 435 request: Message 436 """The DNS request.""" 437 response: Message | None = None 438 """The DNS response.""" 439 440 def get_state(self) -> serializable.State: 441 return { 442 **super().get_state(), 443 "request": self.request.get_state(), 444 "response": self.response.get_state() if self.response else None, 445 } 446 447 def set_state(self, state: serializable.State) -> None: 448 self.request = Message.from_state(state.pop("request")) 449 self.response = Message.from_state(r) if (r := state.pop("response")) else None 450 super().set_state(state) 451 452 def __repr__(self) -> str: 453 return f"<DNSFlow\r\n request={repr(self.request)}\r\n response={repr(self.response)}\r\n>"
433class DNSFlow(flow.Flow): 434 """A DNSFlow is a collection of DNS messages representing a single DNS query.""" 435 436 request: Message 437 """The DNS request.""" 438 response: Message | None = None 439 """The DNS response.""" 440 441 def get_state(self) -> serializable.State: 442 return { 443 **super().get_state(), 444 "request": self.request.get_state(), 445 "response": self.response.get_state() if self.response else None, 446 } 447 448 def set_state(self, state: serializable.State) -> None: 449 self.request = Message.from_state(state.pop("request")) 450 self.response = Message.from_state(r) if (r := state.pop("response")) else None 451 super().set_state(state) 452 453 def __repr__(self) -> str: 454 return f"<DNSFlow\r\n request={repr(self.request)}\r\n response={repr(self.response)}\r\n>"
A DNSFlow is a collection of DNS messages representing a single DNS query.