logo

qmk_firmware

custom branch of QMK firmware git clone https://anongit.hacktivis.me/git/qmk_firmware.git

util.py (2926B)


  1. """Utility functions.
  2. """
  3. import contextlib
  4. import multiprocessing
  5. import sys
  6. from milc import cli
  7. maybe_exit_should_exit = True
  8. maybe_exit_reraise = False
  9. # Controls whether or not early `exit()` calls should be made
  10. def maybe_exit(rc):
  11. if maybe_exit_should_exit:
  12. sys.exit(rc)
  13. if maybe_exit_reraise:
  14. e = sys.exc_info()[1]
  15. if e:
  16. raise e
  17. def maybe_exit_config(should_exit: bool = True, should_reraise: bool = False):
  18. global maybe_exit_should_exit
  19. global maybe_exit_reraise
  20. maybe_exit_should_exit = should_exit
  21. maybe_exit_reraise = should_reraise
  22. def truthy(value, value_if_unknown=False):
  23. """Returns True if the value is truthy, False otherwise.
  24. Deals with:
  25. True: 1, true, t, yes, y, on
  26. False: 0, false, f, no, n, off
  27. """
  28. if value in {False, True}:
  29. return bool(value)
  30. test_value = str(value).strip().lower()
  31. if test_value in {"1", "true", "t", "yes", "y", "on"}:
  32. return True
  33. if test_value in {"0", "false", "f", "no", "n", "off"}:
  34. return False
  35. return value_if_unknown
  36. @contextlib.contextmanager
  37. def parallelize():
  38. """Returns a function that can be used in place of a map() call.
  39. Attempts to use `mpire`, falling back to `multiprocessing` if it's not
  40. available. If parallelization is not requested, returns the original map()
  41. function.
  42. """
  43. # Work out if we've already got a config value for parallel searching
  44. if cli.config.user.parallel_search is None:
  45. parallel_search = True
  46. else:
  47. parallel_search = cli.config.user.parallel_search
  48. # Non-parallel searches use `map()`
  49. if not parallel_search:
  50. yield map
  51. return
  52. # Prefer mpire's `WorkerPool` if it's available
  53. with contextlib.suppress(ImportError):
  54. from mpire import WorkerPool
  55. from mpire.utils import make_single_arguments
  56. with WorkerPool() as pool:
  57. def _worker(func, *args):
  58. # Ensure we don't unpack tuples -- mpire's `WorkerPool` tries to do so normally so we tell it not to.
  59. for r in pool.imap_unordered(func, make_single_arguments(*args, generator=False), progress_bar=True):
  60. yield r
  61. yield _worker
  62. return
  63. # Otherwise fall back to multiprocessing's `Pool`
  64. with multiprocessing.Pool() as pool:
  65. yield pool.imap_unordered
  66. def parallel_map(*args, **kwargs):
  67. """Effectively runs `map()` but executes it in parallel if necessary.
  68. """
  69. with parallelize() as map_fn:
  70. # This needs to be enclosed in a `list()` as some implementations return
  71. # a generator function, which means the scope of the pool is closed off
  72. # before the results are returned. Returning a list ensures results are
  73. # materialised before any worker pool is shut down.
  74. return list(map_fn(*args, **kwargs))