logo

live-bootstrap

Mirror of <https://github.com/fosslinux/live-bootstrap>

check_substitutes.py (5459B)


  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: GPL-3.0-or-later
  3. #
  4. # SPDX-FileCopyrightText: 2024 fosslinux <fosslinux@aussies.space>
  5. """Check that substituted files are the same."""
  6. import bz2
  7. import filecmp
  8. import gzip
  9. import itertools
  10. import lzma
  11. import shutil
  12. import tarfile
  13. import tempfile
  14. import sys
  15. import os
  16. from lib.generator import Generator
  17. # Get a temporary directory to work in
  18. working = tempfile.mkdtemp()
  19. # Colour constants
  20. # pylint: disable=too-few-public-methods
  21. class Colors():
  22. """ANSI Color Codes"""
  23. GREY = "\033[90m"
  24. RED = "\033[91m"
  25. GREEN = "\033[92m"
  26. ORANGE = "\033[91m\033[93m"
  27. YELLOW = "\033[93m"
  28. END = "\033[0m"
  29. def traverse_path(base_root):
  30. """Takes a path and returns a set of all directories and files in that path."""
  31. all_dirs = set()
  32. all_files = set()
  33. for root, directories, files in os.walk(base_root, topdown=True):
  34. for d in directories:
  35. all_dirs.add(os.path.join(root, d).lstrip(base_root))
  36. for f in files:
  37. all_files.add(os.path.join(root, f).lstrip(base_root))
  38. return (all_dirs, all_files)
  39. class Distfile():
  40. """Represents one distfile and operations performed on it."""
  41. def __init__(self, i, url):
  42. self.i = i
  43. self.url = url
  44. self.out_file = f"{i}-{os.path.basename(url)}"
  45. self.filepath = ""
  46. def download(self):
  47. """Downloads the distfile."""
  48. Generator.download_file(self.url, working, self.out_file, silent=True)
  49. self.filepath = os.path.join(working, self.out_file)
  50. def decompress(self):
  51. """Decompresses the distfile."""
  52. compression = self.out_file.rsplit('.', maxsplit=1)[-1]
  53. decompress_func = {
  54. "gz": gzip.open,
  55. "tgz": gzip.open,
  56. "bz2": bz2.open,
  57. "xz": lzma.open,
  58. "lzma": lzma.open
  59. }
  60. if compression not in decompress_func:
  61. # No decompression needed
  62. return
  63. # Remove the compression extension
  64. new_path = '.'.join(self.filepath.split('.')[:-1])
  65. # tgz -> .tar
  66. if compression == "tgz":
  67. new_path = f"{new_path}.tar"
  68. # Move the decompressed binary stream to a new file
  69. with decompress_func[compression](self.filepath, 'rb') as fin:
  70. with open(new_path, 'wb') as fout:
  71. shutil.copyfileobj(fin, fout)
  72. self.filepath = new_path
  73. def extract(self):
  74. """Extracts the distfile."""
  75. # Sanity check
  76. if not tarfile.is_tarfile(self.filepath):
  77. return
  78. out_dir = os.path.join(working, f"{self.i}")
  79. os.mkdir(out_dir)
  80. with tarfile.open(self.filepath, 'r') as f:
  81. f.extractall(path=out_dir)
  82. self.filepath = out_dir
  83. # It makes more sense here to label them d1 and d2 rather than have one be self.
  84. # pylint: disable=no-self-argument
  85. def compare(d1, d2):
  86. """Compares the distfile to another distfile."""
  87. if not os.path.isdir(d1.filepath):
  88. # Compare files
  89. return filecmp.cmp(d1.filepath, d2.filepath, shallow=False)
  90. if not os.path.isdir(d2.filepath):
  91. # Then, d2 is a file and d1 is a directory
  92. return False
  93. # Otherwise it's two directories
  94. dirnames1, filenames1 = traverse_path(d1.filepath)
  95. dirnames2, filenames2 = traverse_path(d2.filepath)
  96. if dirnames1 != dirnames2:
  97. return False
  98. if filenames1 != filenames2:
  99. return False
  100. return filecmp.cmpfiles(d1.filepath, d2.filepath, filenames1, shallow=False)
  101. def check(*args):
  102. """Check if a list of distfiles are equivalent."""
  103. notequiv = []
  104. # Find all pairs that are not equivalent
  105. for pair in itertools.combinations(args, 2):
  106. if pair[0].compare(pair[1]):
  107. print(f"{Colors.GREY}DEBUG: {pair[0].url} is equivalent to {pair[1].url}{Colors.END}")
  108. else:
  109. notequiv.append(pair)
  110. # Decompress all, and check again
  111. for d in {y for x in notequiv for y in x}:
  112. d.decompress()
  113. for pair in notequiv.copy():
  114. if pair[0].compare(pair[1]):
  115. # pylint: disable=line-too-long
  116. print(f"{Colors.YELLOW}NOTE: {pair[0].url} is equivalent to {pair[1].url} when decompressed{Colors.END}")
  117. notequiv.remove(pair)
  118. # Extract all, and check again
  119. for d in {y for x in notequiv for y in x}:
  120. d.extract()
  121. has_error = False
  122. for pair in notequiv:
  123. if pair[0].compare(pair[1]):
  124. # pylint: disable=line-too-long
  125. print(f"{Colors.ORANGE}WARN: {pair[0].url} is equivalent to {pair[1].url} when extracted{Colors.END}")
  126. else:
  127. has_error = True
  128. # pylint: disable=line-too-long
  129. print(f"{Colors.RED}ERROR: {pair[0].url} is not equivalent to {pair[1].url}!{Colors.END}")
  130. return has_error
  131. def main():
  132. """Main function."""
  133. has_error = False
  134. with open("substitutes", 'r', encoding="utf-8") as f:
  135. for line in f.readlines():
  136. urls = line.strip().split(' ')
  137. distfiles = []
  138. for i, url in enumerate(urls):
  139. distfiles.append(Distfile(i, url))
  140. for distfile in distfiles:
  141. distfile.download()
  142. if check(*distfiles):
  143. has_error = True
  144. sys.exit(has_error)
  145. if __name__ == "__main__":
  146. main()