logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

_common.py (3074B)


  1. import os
  2. import pathlib
  3. import tempfile
  4. import functools
  5. import contextlib
  6. import types
  7. import importlib
  8. from typing import Union, Any, Optional
  9. from .abc import ResourceReader, Traversable
  10. from ._adapters import wrap_spec
  11. Package = Union[types.ModuleType, str]
  12. def files(package):
  13. # type: (Package) -> Traversable
  14. """
  15. Get a Traversable resource from a package
  16. """
  17. return from_package(get_package(package))
  18. def normalize_path(path):
  19. # type: (Any) -> str
  20. """Normalize a path by ensuring it is a string.
  21. If the resulting string contains path separators, an exception is raised.
  22. """
  23. str_path = str(path)
  24. parent, file_name = os.path.split(str_path)
  25. if parent:
  26. raise ValueError(f'{path!r} must be only a file name')
  27. return file_name
  28. def get_resource_reader(package):
  29. # type: (types.ModuleType) -> Optional[ResourceReader]
  30. """
  31. Return the package's loader if it's a ResourceReader.
  32. """
  33. # We can't use
  34. # a issubclass() check here because apparently abc.'s __subclasscheck__()
  35. # hook wants to create a weak reference to the object, but
  36. # zipimport.zipimporter does not support weak references, resulting in a
  37. # TypeError. That seems terrible.
  38. spec = package.__spec__
  39. reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
  40. if reader is None:
  41. return None
  42. return reader(spec.name) # type: ignore
  43. def resolve(cand):
  44. # type: (Package) -> types.ModuleType
  45. return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
  46. def get_package(package):
  47. # type: (Package) -> types.ModuleType
  48. """Take a package name or module object and return the module.
  49. Raise an exception if the resolved module is not a package.
  50. """
  51. resolved = resolve(package)
  52. if wrap_spec(resolved).submodule_search_locations is None:
  53. raise TypeError(f'{package!r} is not a package')
  54. return resolved
  55. def from_package(package):
  56. """
  57. Return a Traversable object for the given package.
  58. """
  59. spec = wrap_spec(package)
  60. reader = spec.loader.get_resource_reader(spec.name)
  61. return reader.files()
  62. @contextlib.contextmanager
  63. def _tempfile(reader, suffix=''):
  64. # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
  65. # blocks due to the need to close the temporary file to work on Windows
  66. # properly.
  67. fd, raw_path = tempfile.mkstemp(suffix=suffix)
  68. try:
  69. os.write(fd, reader())
  70. os.close(fd)
  71. del reader
  72. yield pathlib.Path(raw_path)
  73. finally:
  74. try:
  75. os.remove(raw_path)
  76. except FileNotFoundError:
  77. pass
  78. @functools.singledispatch
  79. def as_file(path):
  80. """
  81. Given a Traversable object, return that object as a
  82. path on the local file system in a context manager.
  83. """
  84. return _tempfile(path.read_bytes, suffix=path.name)
  85. @as_file.register(pathlib.Path)
  86. @contextlib.contextmanager
  87. def _(path):
  88. """
  89. Degenerate behavior for pathlib.Path objects.
  90. """
  91. yield path