logo

oasis-root

Compiled tree of Oasis Linux based on own branch at <https://hacktivis.me/git/oasis/> git clone https://anongit.hacktivis.me/git/oasis-root.git

readers.py (3587B)


  1. import collections
  2. import zipfile
  3. import pathlib
  4. from . import abc
  5. def remove_duplicates(items):
  6. return iter(collections.OrderedDict.fromkeys(items))
  7. class FileReader(abc.TraversableResources):
  8. def __init__(self, loader):
  9. self.path = pathlib.Path(loader.path).parent
  10. def resource_path(self, resource):
  11. """
  12. Return the file system path to prevent
  13. `resources.path()` from creating a temporary
  14. copy.
  15. """
  16. return str(self.path.joinpath(resource))
  17. def files(self):
  18. return self.path
  19. class ZipReader(abc.TraversableResources):
  20. def __init__(self, loader, module):
  21. _, _, name = module.rpartition('.')
  22. self.prefix = loader.prefix.replace('\\', '/') + name + '/'
  23. self.archive = loader.archive
  24. def open_resource(self, resource):
  25. try:
  26. return super().open_resource(resource)
  27. except KeyError as exc:
  28. raise FileNotFoundError(exc.args[0])
  29. def is_resource(self, path):
  30. # workaround for `zipfile.Path.is_file` returning true
  31. # for non-existent paths.
  32. target = self.files().joinpath(path)
  33. return target.is_file() and target.exists()
  34. def files(self):
  35. return zipfile.Path(self.archive, self.prefix)
  36. class MultiplexedPath(abc.Traversable):
  37. """
  38. Given a series of Traversable objects, implement a merged
  39. version of the interface across all objects. Useful for
  40. namespace packages which may be multihomed at a single
  41. name.
  42. """
  43. def __init__(self, *paths):
  44. self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
  45. if not self._paths:
  46. message = 'MultiplexedPath must contain at least one path'
  47. raise FileNotFoundError(message)
  48. if not all(path.is_dir() for path in self._paths):
  49. raise NotADirectoryError('MultiplexedPath only supports directories')
  50. def iterdir(self):
  51. visited = []
  52. for path in self._paths:
  53. for file in path.iterdir():
  54. if file.name in visited:
  55. continue
  56. visited.append(file.name)
  57. yield file
  58. def read_bytes(self):
  59. raise FileNotFoundError(f'{self} is not a file')
  60. def read_text(self, *args, **kwargs):
  61. raise FileNotFoundError(f'{self} is not a file')
  62. def is_dir(self):
  63. return True
  64. def is_file(self):
  65. return False
  66. def joinpath(self, child):
  67. # first try to find child in current paths
  68. for file in self.iterdir():
  69. if file.name == child:
  70. return file
  71. # if it does not exist, construct it with the first path
  72. return self._paths[0] / child
  73. __truediv__ = joinpath
  74. def open(self, *args, **kwargs):
  75. raise FileNotFoundError(f'{self} is not a file')
  76. @property
  77. def name(self):
  78. return self._paths[0].name
  79. def __repr__(self):
  80. paths = ', '.join(f"'{path}'" for path in self._paths)
  81. return f'MultiplexedPath({paths})'
  82. class NamespaceReader(abc.TraversableResources):
  83. def __init__(self, namespace_path):
  84. if 'NamespacePath' not in str(namespace_path):
  85. raise ValueError('Invalid path')
  86. self.path = MultiplexedPath(*list(namespace_path))
  87. def resource_path(self, resource):
  88. """
  89. Return the file system path to prevent
  90. `resources.path()` from creating a temporary
  91. copy.
  92. """
  93. return str(self.path.joinpath(resource))
  94. def files(self):
  95. return self.path