Coverage for tld/utils.py: 91%

206 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-27 05:40 +0000

1from __future__ import unicode_literals 

2 

3import argparse 

4import sys 

5from codecs import open as codecs_open 

6from functools import lru_cache 

7from os.path import isabs 

8from typing import Dict, List, Optional, Tuple, Type, Union 

9from urllib.parse import SplitResult, urlsplit 

10 

11from .base import BaseTLDSourceParser, Registry 

12from .exceptions import ( 

13 TldBadUrl, 

14 TldDomainNotFound, 

15 TldImproperlyConfigured, 

16 TldIOError, 

17) 

18from .helpers import project_dir 

19from .result import Result 

20from .trie import Trie 

21 

22# codecs_open = open 

23 

24 

25__author__ = "Artur Barseghyan" 

26__copyright__ = "2013-2023 Artur Barseghyan" 

27__license__ = "MPL-1.1 OR GPL-2.0-only OR LGPL-2.1-or-later" 

28__all__ = ( 

29 "BaseMozillaTLDSourceParser", 

30 "get_fld", 

31 "get_tld", 

32 "get_tld_names", 

33 "get_tld_names_container", 

34 "is_tld", 

35 "MozillaTLDSourceParser", 

36 "MozillaPublicOnlyTLDSourceParser", 

37 "parse_tld", 

38 "pop_tld_names_container", 

39 "process_url", 

40 "reset_tld_names", 

41 "Result", 

42 "tld_names", 

43 "update_tld_names", 

44 "update_tld_names_cli", 

45 "update_tld_names_container", 

46) 

47 

48tld_names: Dict[str, Trie] = {} 

49 

50 

51def get_tld_names_container() -> Dict[str, Trie]: 

52 """Get container of all tld names. 

53 

54 :return: 

55 :rtype dict: 

56 """ 

57 global tld_names 

58 return tld_names 

59 

60 

61def update_tld_names_container( 

62 tld_names_local_path: str, trie_obj: Trie 

63) -> None: 

64 """Update TLD Names container item. 

65 

66 :param tld_names_local_path: 

67 :param trie_obj: 

68 :return: 

69 """ 

70 global tld_names 

71 # tld_names.update({tld_names_local_path: trie_obj}) 

72 tld_names[tld_names_local_path] = trie_obj 

73 

74 

75def pop_tld_names_container(tld_names_local_path: str) -> None: 

76 """Remove TLD names container item. 

77 

78 :param tld_names_local_path: 

79 :return: 

80 """ 

81 global tld_names 

82 tld_names.pop(tld_names_local_path, None) 

83 

84 

85@lru_cache(maxsize=128, typed=True) 

86def update_tld_names( 

87 fail_silently: bool = False, parser_uid: str = None 

88) -> bool: 

89 """Update TLD names. 

90 

91 :param fail_silently: 

92 :param parser_uid: 

93 :return: 

94 """ 

95 results: List[bool] = [] 

96 results_append = results.append 

97 if parser_uid: 

98 parser_cls = Registry.get(parser_uid, None) 

99 if parser_cls and parser_cls.source_url: 

100 results_append( 

101 parser_cls.update_tld_names(fail_silently=fail_silently) 

102 ) 

103 else: 

104 for parser_uid, parser_cls in Registry.items(): 

105 if parser_cls and parser_cls.source_url: 

106 results_append( 

107 parser_cls.update_tld_names(fail_silently=fail_silently) 

108 ) 

109 

110 return all(results) 

111 

112 

113def update_tld_names_cli() -> int: 

114 """CLI wrapper for update_tld_names. 

115 

116 Since update_tld_names returns True on success, we need to negate the 

117 result to match CLI semantics. 

118 """ 

119 parser = argparse.ArgumentParser(description="Update TLD names") 

120 parser.add_argument( 

121 "parser_uid", 

122 nargs="?", 

123 default=None, 

124 help="UID of the parser to update TLD names for.", 

125 ) 

126 parser.add_argument( 

127 "--fail-silently", 

128 dest="fail_silently", 

129 default=False, 

130 action="store_true", 

131 help="Fail silently", 

132 ) 

133 args = parser.parse_args(sys.argv[1:]) 

134 parser_uid = args.parser_uid 

135 fail_silently = args.fail_silently 

136 return int( 

137 not update_tld_names(parser_uid=parser_uid, fail_silently=fail_silently) 

138 ) 

139 

140 

141def get_tld_names( 

142 fail_silently: bool = False, 

143 retry_count: int = 0, 

144 parser_class: Type[BaseTLDSourceParser] = None, 

145) -> Dict[str, Trie]: 

146 """Build the ``tlds`` list if empty. Recursive. 

147 

148 :param fail_silently: If set to True, no exceptions are raised and None 

149 is returned on failure. 

150 :param retry_count: If greater than 1, we raise an exception in order 

151 to avoid infinite loops. 

152 :param parser_class: 

153 :type fail_silently: bool 

154 :type retry_count: int 

155 :type parser_class: BaseTLDSourceParser 

156 :return: List of TLD names 

157 :rtype: obj:`tld.utils.Trie` 

158 """ 

159 if not parser_class: 

160 parser_class = MozillaTLDSourceParser 

161 

162 return parser_class.get_tld_names( 

163 fail_silently=fail_silently, retry_count=retry_count 

164 ) 

165 

166 

167# ************************************************************************** 

168# **************************** Parser classes ****************************** 

169# ************************************************************************** 

170 

171 

172class BaseMozillaTLDSourceParser(BaseTLDSourceParser): 

173 @classmethod 

174 def get_tld_names( 

175 cls, fail_silently: bool = False, retry_count: int = 0 

176 ) -> Optional[Dict[str, Trie]]: 

177 """Parse. 

178 

179 :param fail_silently: 

180 :param retry_count: 

181 :return: 

182 """ 

183 if retry_count > 1: 

184 if fail_silently: 

185 return None 

186 else: 

187 raise TldIOError 

188 

189 global tld_names 

190 _tld_names = tld_names 

191 # _tld_names = get_tld_names_container() 

192 

193 # If already loaded, return 

194 if ( 

195 cls.local_path in _tld_names 

196 and _tld_names[cls.local_path] is not None 

197 ): 

198 return _tld_names 

199 

200 try: 

201 # Load the TLD names file 

202 if isabs(cls.local_path): 

203 local_path = cls.local_path 

204 else: 

205 local_path = project_dir(cls.local_path) 

206 local_file = codecs_open(local_path, "r", encoding="utf8") 

207 trie = Trie() 

208 trie_add = trie.add # Performance opt 

209 # Make a list of it all, strip all garbage 

210 private_section = False 

211 include_private = cls.include_private 

212 

213 for line in local_file: 

214 if "===BEGIN PRIVATE DOMAINS===" in line: 

215 private_section = True 

216 

217 if private_section and not include_private: 

218 break 

219 

220 # Puny code TLD names 

221 if "// xn--" in line: 

222 line = line.split()[1] 

223 

224 if line[0] in ("/", "\n"): 

225 continue 

226 

227 trie_add(f"{line.strip()}", private=private_section) 

228 

229 update_tld_names_container(cls.local_path, trie) 

230 

231 local_file.close() 

232 except IOError: 

233 # Grab the file 

234 cls.update_tld_names(fail_silently=fail_silently) 

235 # Increment ``retry_count`` in order to avoid infinite loops 

236 retry_count += 1 

237 # Run again 

238 return cls.get_tld_names( 

239 fail_silently=fail_silently, retry_count=retry_count 

240 ) 

241 except Exception as err: 

242 if fail_silently: 

243 return None 

244 else: 

245 raise err 

246 finally: 

247 try: 

248 local_file.close() 

249 except Exception: 

250 pass 

251 

252 return _tld_names 

253 

254 

255class MozillaTLDSourceParser(BaseMozillaTLDSourceParser): 

256 """Mozilla TLD source.""" 

257 

258 uid: str = "mozilla" 

259 source_url: str = "https://publicsuffix.org/list/public_suffix_list.dat" 

260 local_path: str = "res/effective_tld_names.dat.txt" 

261 

262 

263class MozillaPublicOnlyTLDSourceParser(BaseMozillaTLDSourceParser): 

264 """Mozilla TLD source.""" 

265 

266 uid: str = "mozilla_public_only" 

267 source_url: str = ( 

268 "https://publicsuffix.org/list/public_suffix_list.dat?publiconly" 

269 ) 

270 local_path: str = "res/effective_tld_names_public_only.dat.txt" 

271 include_private: bool = False 

272 

273 

274# ************************************************************************** 

275# **************************** Core functions ****************************** 

276# ************************************************************************** 

277 

278 

279def process_url( 

280 url: Union[str, SplitResult], 

281 fail_silently: bool = False, 

282 fix_protocol: bool = False, 

283 search_public: bool = True, 

284 search_private: bool = True, 

285 parser_class: Type[BaseTLDSourceParser] = MozillaTLDSourceParser, 

286) -> Union[Tuple[List[str], int, SplitResult], Tuple[None, None, SplitResult]]: 

287 """Process URL. 

288 

289 :param parser_class: 

290 :param url: 

291 :param fail_silently: 

292 :param fix_protocol: 

293 :param search_public: 

294 :param search_private: 

295 :return: 

296 """ 

297 if not (search_public or search_private): 

298 raise TldImproperlyConfigured( 

299 "Either `search_public` or `search_private` (or both) shall be " 

300 "set to True." 

301 ) 

302 

303 # Init 

304 _tld_names = get_tld_names( 

305 fail_silently=fail_silently, parser_class=parser_class 

306 ) 

307 

308 if not isinstance(url, SplitResult): 

309 if fix_protocol and not url.startswith(("//", "http://", "https://")): 

310 url = f"https://{url}" 

311 

312 # Get parsed URL as we might need it later 

313 try: 

314 parsed_url = urlsplit(url) 

315 except ValueError as e: 

316 if fail_silently: 

317 return None, None, url 

318 else: 

319 raise e 

320 else: 

321 parsed_url = url 

322 

323 # Get (sub) domain name 

324 domain_name = parsed_url.hostname 

325 

326 if not domain_name: 

327 if fail_silently: 

328 return None, None, parsed_url 

329 else: 

330 raise TldBadUrl(url=url) 

331 

332 domain_name = domain_name.lower() 

333 

334 # This will correctly handle dots at the end of domain name in URLs like 

335 # https://github.com............/barseghyanartur/tld/ 

336 if domain_name.endswith("."): 

337 domain_name = domain_name.rstrip(".") 

338 

339 domain_parts = domain_name.split(".") 

340 tld_names_local_path = parser_class.local_path 

341 

342 # Now we query our Trie iterating on the domain parts in reverse order 

343 node = _tld_names[tld_names_local_path].root 

344 current_length = 0 

345 tld_length = 0 

346 match = None 

347 len_domain_parts = len(domain_parts) 

348 for i in range(len_domain_parts - 1, -1, -1): 

349 part = domain_parts[i] 

350 

351 # Cannot go deeper 

352 if node.children is None: 

353 break 

354 

355 # Exception 

356 if part == node.exception: 

357 break 

358 

359 child = node.children.get(part) 

360 

361 # Wildcards 

362 if child is None: 

363 child = node.children.get("*") 

364 

365 # If the current part is not in current node's children, we can stop 

366 if child is None: 

367 break 

368 

369 # Else we move deeper and increment our tld offset 

370 current_length += 1 

371 node = child 

372 

373 if node.leaf: 

374 tld_length = current_length 

375 match = node 

376 

377 # Checking the node we finished on is a leaf and is one we allow 

378 if ( 

379 (match is None) 

380 or (not match.leaf) 

381 or (not search_public and not match.private) 

382 or (not search_private and match.private) 

383 ): 

384 if fail_silently: 

385 return None, None, parsed_url 

386 else: 

387 raise TldDomainNotFound(domain_name=domain_name) 

388 

389 if len_domain_parts == tld_length: 

390 non_zero_i = -1 # hostname = tld 

391 else: 

392 non_zero_i = max(1, len_domain_parts - tld_length) 

393 

394 return domain_parts, non_zero_i, parsed_url 

395 

396 

397def get_fld( 

398 url: Union[str, SplitResult], 

399 fail_silently: bool = False, 

400 fix_protocol: bool = False, 

401 search_public: bool = True, 

402 search_private: bool = True, 

403 parser_class: Type[BaseTLDSourceParser] = None, 

404 **kwargs, 

405) -> Optional[str]: 

406 """Extract the first level domain. 

407 

408 Extract the top level domain based on the mozilla's effective TLD names 

409 dat file. Returns a string. May throw ``TldBadUrl`` or 

410 ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD 

411 match found respectively. 

412 

413 :param url: URL to get top level domain from. 

414 :param fail_silently: If set to True, no exceptions are raised and None 

415 is returned on failure. 

416 :param fix_protocol: If set to True, missing or wrong protocol is 

417 ignored (https is appended instead). 

418 :param search_public: If set to True, search in public domains. 

419 :param search_private: If set to True, search in private domains. 

420 :param parser_class: 

421 :type url: str | SplitResult 

422 :type fail_silently: bool 

423 :type fix_protocol: bool 

424 :type search_public: bool 

425 :type search_private: bool 

426 :return: String with top level domain (if ``as_object`` argument 

427 is set to False) or a ``tld.utils.Result`` object (if ``as_object`` 

428 argument is set to True); returns None on failure. 

429 :rtype: str 

430 """ 

431 if "as_object" in kwargs: 

432 raise TldImproperlyConfigured( 

433 "`as_object` argument is deprecated for `get_fld`. Use `get_tld` " 

434 "instead." 

435 ) 

436 

437 if not parser_class: 

438 parser_class = ( 

439 MozillaTLDSourceParser 

440 if search_private 

441 else MozillaPublicOnlyTLDSourceParser 

442 ) 

443 

444 domain_parts, non_zero_i, parsed_url = process_url( 

445 url=url, 

446 fail_silently=fail_silently, 

447 fix_protocol=fix_protocol, 

448 search_public=search_public, 

449 search_private=search_private, 

450 parser_class=parser_class, 

451 ) 

452 

453 if domain_parts is None: 

454 return None 

455 

456 # This should be None when domain_parts is None 

457 # but mypy isn't quite smart enough to figure that out yet 

458 assert non_zero_i is not None 

459 if non_zero_i < 0: 

460 # hostname = tld 

461 return parsed_url.hostname 

462 

463 return ".".join(domain_parts[non_zero_i - 1 :]) 

464 

465 

466def get_tld( 

467 url: Union[str, SplitResult], 

468 fail_silently: bool = False, 

469 as_object: bool = False, 

470 fix_protocol: bool = False, 

471 search_public: bool = True, 

472 search_private: bool = True, 

473 parser_class: Type[BaseTLDSourceParser] = None, 

474) -> Optional[Union[str, Result]]: 

475 """Extract the top level domain. 

476 

477 Extract the top level domain based on the mozilla's effective TLD names 

478 dat file. Returns a string. May throw ``TldBadUrl`` or 

479 ``TldDomainNotFound`` exceptions if there's bad URL provided or no TLD 

480 match found respectively. 

481 

482 :param url: URL to get top level domain from. 

483 :param fail_silently: If set to True, no exceptions are raised and None 

484 is returned on failure. 

485 :param as_object: If set to True, ``tld.utils.Result`` object is returned, 

486 ``domain``, ``suffix`` and ``tld`` properties. 

487 :param fix_protocol: If set to True, missing or wrong protocol is 

488 ignored (https is appended instead). 

489 :param search_public: If set to True, search in public domains. 

490 :param search_private: If set to True, search in private domains. 

491 :param parser_class: 

492 :type url: str | SplitResult 

493 :type fail_silently: bool 

494 :type as_object: bool 

495 :type fix_protocol: bool 

496 :type search_public: bool 

497 :type search_private: bool 

498 :return: String with top level domain (if ``as_object`` argument 

499 is set to False) or a ``tld.utils.Result`` object (if ``as_object`` 

500 argument is set to True); returns None on failure. 

501 :rtype: str 

502 """ 

503 if not parser_class: 

504 parser_class = ( 

505 MozillaTLDSourceParser 

506 if search_private 

507 else MozillaPublicOnlyTLDSourceParser 

508 ) 

509 

510 domain_parts, non_zero_i, parsed_url = process_url( 

511 url=url, 

512 fail_silently=fail_silently, 

513 fix_protocol=fix_protocol, 

514 search_public=search_public, 

515 search_private=search_private, 

516 parser_class=parser_class, 

517 ) 

518 

519 if domain_parts is None: 

520 return None 

521 

522 # This should be None when domain_parts is None 

523 # but mypy isn't quite smart enough to figure that out yet 

524 assert non_zero_i is not None 

525 

526 if not as_object: 

527 if non_zero_i < 0: 

528 # hostname = tld 

529 return parsed_url.hostname 

530 return ".".join(domain_parts[non_zero_i:]) 

531 

532 if non_zero_i < 0: 

533 # hostname = tld 

534 subdomain = "" 

535 domain = "" 

536 # This is checked in `process_url`, but the type is 

537 # ambiguous (Optional[str]) so this assertion is just to satisfy mypy 

538 assert parsed_url.hostname is not None, "No hostname in URL" 

539 _tld = parsed_url.hostname 

540 else: 

541 subdomain = ".".join(domain_parts[: non_zero_i - 1]) 

542 domain = ".".join(domain_parts[non_zero_i - 1 : non_zero_i]) 

543 _tld = ".".join(domain_parts[non_zero_i:]) 

544 

545 return Result( 

546 subdomain=subdomain, domain=domain, tld=_tld, parsed_url=parsed_url 

547 ) 

548 

549 

550def parse_tld( 

551 url: Union[str, SplitResult], 

552 fail_silently: bool = False, 

553 fix_protocol: bool = False, 

554 search_public: bool = True, 

555 search_private: bool = True, 

556 parser_class: Type[BaseTLDSourceParser] = None, 

557) -> Union[Tuple[None, None, None], Tuple[str, str, str]]: 

558 """Parse TLD into parts. 

559 

560 :param url: 

561 :param fail_silently: 

562 :param fix_protocol: 

563 :param search_public: 

564 :param search_private: 

565 :param parser_class: 

566 :return: Tuple (tld, domain, subdomain) 

567 :rtype: tuple 

568 """ 

569 if not parser_class: 

570 parser_class = ( 

571 MozillaTLDSourceParser 

572 if search_private 

573 else MozillaPublicOnlyTLDSourceParser 

574 ) 

575 

576 try: 

577 obj = get_tld( 

578 url, 

579 fail_silently=fail_silently, 

580 as_object=True, 

581 fix_protocol=fix_protocol, 

582 search_public=search_public, 

583 search_private=search_private, 

584 parser_class=parser_class, 

585 ) 

586 if obj is None: 

587 return None, None, None 

588 

589 return obj.tld, obj.domain, obj.subdomain # type: ignore 

590 

591 except (TldBadUrl, TldDomainNotFound, TldImproperlyConfigured, TldIOError): 

592 pass 

593 

594 return None, None, None 

595 

596 

597def is_tld( 

598 value: Union[str, SplitResult], 

599 search_public: bool = True, 

600 search_private: bool = True, 

601 parser_class: Type[BaseTLDSourceParser] = None, 

602) -> bool: 

603 """Check if given URL is tld. 

604 

605 :param value: URL to get top level domain from. 

606 :param search_public: If set to True, search in public domains. 

607 :param search_private: If set to True, search in private domains. 

608 :param parser_class: 

609 :type value: str 

610 :type search_public: bool 

611 :type search_private: bool 

612 :return: 

613 :rtype: bool 

614 """ 

615 if not parser_class: 

616 parser_class = ( 

617 MozillaTLDSourceParser 

618 if search_private 

619 else MozillaPublicOnlyTLDSourceParser 

620 ) 

621 

622 _tld = get_tld( 

623 url=value, 

624 fail_silently=True, 

625 fix_protocol=True, 

626 search_public=search_public, 

627 search_private=search_private, 

628 parser_class=parser_class, 

629 ) 

630 return value == _tld 

631 

632 

633def reset_tld_names(tld_names_local_path: str = None) -> None: 

634 """Reset the ``tld_names`` to empty value. 

635 

636 If ``tld_names_local_path`` is given, removes specified 

637 entry from ``tld_names`` instead. 

638 

639 :param tld_names_local_path: 

640 :type tld_names_local_path: str 

641 :return: 

642 """ 

643 

644 if tld_names_local_path: 

645 pop_tld_names_container(tld_names_local_path) 

646 else: 

647 global tld_names 

648 tld_names = {}