logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

resources.py (5705B)


  1. import os
  2. import io
  3. from . import _common
  4. from ._common import as_file, files
  5. from .abc import ResourceReader
  6. from contextlib import suppress
  7. from importlib.abc import ResourceLoader
  8. from importlib.machinery import ModuleSpec
  9. from io import BytesIO, TextIOWrapper
  10. from pathlib import Path
  11. from types import ModuleType
  12. from typing import ContextManager, Iterable, Union
  13. from typing import cast
  14. from typing.io import BinaryIO, TextIO
  15. from collections.abc import Sequence
  16. from functools import singledispatch
  17. __all__ = [
  18. 'Package',
  19. 'Resource',
  20. 'ResourceReader',
  21. 'as_file',
  22. 'contents',
  23. 'files',
  24. 'is_resource',
  25. 'open_binary',
  26. 'open_text',
  27. 'path',
  28. 'read_binary',
  29. 'read_text',
  30. ]
  31. Package = Union[str, ModuleType]
  32. Resource = Union[str, os.PathLike]
  33. def open_binary(package: Package, resource: Resource) -> BinaryIO:
  34. """Return a file-like object opened for binary reading of the resource."""
  35. resource = _common.normalize_path(resource)
  36. package = _common.get_package(package)
  37. reader = _common.get_resource_reader(package)
  38. if reader is not None:
  39. return reader.open_resource(resource)
  40. spec = cast(ModuleSpec, package.__spec__)
  41. # Using pathlib doesn't work well here due to the lack of 'strict'
  42. # argument for pathlib.Path.resolve() prior to Python 3.6.
  43. if spec.submodule_search_locations is not None:
  44. paths = spec.submodule_search_locations
  45. elif spec.origin is not None:
  46. paths = [os.path.dirname(os.path.abspath(spec.origin))]
  47. for package_path in paths:
  48. full_path = os.path.join(package_path, resource)
  49. try:
  50. return open(full_path, mode='rb')
  51. except OSError:
  52. # Just assume the loader is a resource loader; all the relevant
  53. # importlib.machinery loaders are and an AttributeError for
  54. # get_data() will make it clear what is needed from the loader.
  55. loader = cast(ResourceLoader, spec.loader)
  56. data = None
  57. if hasattr(spec.loader, 'get_data'):
  58. with suppress(OSError):
  59. data = loader.get_data(full_path)
  60. if data is not None:
  61. return BytesIO(data)
  62. raise FileNotFoundError(f'{resource!r} resource not found in {spec.name!r}')
  63. def open_text(
  64. package: Package,
  65. resource: Resource,
  66. encoding: str = 'utf-8',
  67. errors: str = 'strict',
  68. ) -> TextIO:
  69. """Return a file-like object opened for text reading of the resource."""
  70. return TextIOWrapper(
  71. open_binary(package, resource), encoding=encoding, errors=errors
  72. )
  73. def read_binary(package: Package, resource: Resource) -> bytes:
  74. """Return the binary contents of the resource."""
  75. with open_binary(package, resource) as fp:
  76. return fp.read()
  77. def read_text(
  78. package: Package,
  79. resource: Resource,
  80. encoding: str = 'utf-8',
  81. errors: str = 'strict',
  82. ) -> str:
  83. """Return the decoded string of the resource.
  84. The decoding-related arguments have the same semantics as those of
  85. bytes.decode().
  86. """
  87. with open_text(package, resource, encoding, errors) as fp:
  88. return fp.read()
  89. def path(
  90. package: Package,
  91. resource: Resource,
  92. ) -> 'ContextManager[Path]':
  93. """A context manager providing a file path object to the resource.
  94. If the resource does not already exist on its own on the file system,
  95. a temporary file will be created. If the file was created, the file
  96. will be deleted upon exiting the context manager (no exception is
  97. raised if the file was deleted prior to the context manager
  98. exiting).
  99. """
  100. reader = _common.get_resource_reader(_common.get_package(package))
  101. return (
  102. _path_from_reader(reader, _common.normalize_path(resource))
  103. if reader
  104. else _common.as_file(
  105. _common.files(package).joinpath(_common.normalize_path(resource))
  106. )
  107. )
  108. def _path_from_reader(reader, resource):
  109. return _path_from_resource_path(reader, resource) or _path_from_open_resource(
  110. reader, resource
  111. )
  112. def _path_from_resource_path(reader, resource):
  113. with suppress(FileNotFoundError):
  114. return Path(reader.resource_path(resource))
  115. def _path_from_open_resource(reader, resource):
  116. saved = io.BytesIO(reader.open_resource(resource).read())
  117. return _common._tempfile(saved.read, suffix=resource)
  118. def is_resource(package: Package, name: str) -> bool:
  119. """True if 'name' is a resource inside 'package'.
  120. Directories are *not* resources.
  121. """
  122. package = _common.get_package(package)
  123. _common.normalize_path(name)
  124. reader = _common.get_resource_reader(package)
  125. if reader is not None:
  126. return reader.is_resource(name)
  127. package_contents = set(contents(package))
  128. if name not in package_contents:
  129. return False
  130. return (_common.from_package(package) / name).is_file()
  131. def contents(package: Package) -> Iterable[str]:
  132. """Return an iterable of entries in 'package'.
  133. Note that not all entries are resources. Specifically, directories are
  134. not considered resources. Use `is_resource()` on each entry returned here
  135. to check if it is a resource or not.
  136. """
  137. package = _common.get_package(package)
  138. reader = _common.get_resource_reader(package)
  139. if reader is not None:
  140. return _ensure_sequence(reader.contents())
  141. transversable = _common.from_package(package)
  142. if transversable.is_dir():
  143. return list(item.name for item in transversable.iterdir())
  144. return []
  145. @singledispatch
  146. def _ensure_sequence(iterable):
  147. return list(iterable)
  148. @_ensure_sequence.register(Sequence)
  149. def _(iterable):
  150. return iterable