logo

qmk_firmware

custom branch of QMK firmware git clone https://anongit.hacktivis.me/git/qmk_firmware.git

json_schema.py (4631B)


  1. """Functions that help us generate and use info.json files.
  2. """
  3. import json
  4. import hjson
  5. import jsonschema
  6. from collections.abc import Mapping
  7. from functools import lru_cache
  8. from typing import OrderedDict
  9. from pathlib import Path
  10. from copy import deepcopy
  11. from milc import cli
  12. from qmk.util import maybe_exit
  13. def _dict_raise_on_duplicates(ordered_pairs):
  14. """Reject duplicate keys."""
  15. d = {}
  16. for k, v in ordered_pairs:
  17. if k in d:
  18. raise ValueError("duplicate key: %r" % (k,))
  19. else:
  20. d[k] = v
  21. return d
  22. @lru_cache(maxsize=20)
  23. def _json_load_impl(json_file, strict=True):
  24. """Load a json file from disk.
  25. Note: file must be a Path object.
  26. """
  27. try:
  28. # Get the IO Stream for Path objects
  29. # Not necessary if the data is provided via stdin
  30. if isinstance(json_file, Path):
  31. json_file = json_file.open(encoding='utf-8')
  32. return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
  33. except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
  34. cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
  35. maybe_exit(1)
  36. except Exception as e:
  37. cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
  38. maybe_exit(1)
  39. def json_load(json_file, strict=True):
  40. return deepcopy(_json_load_impl(json_file=json_file, strict=strict))
  41. @lru_cache(maxsize=20)
  42. def load_jsonschema(schema_name):
  43. """Read a jsonschema file from disk.
  44. """
  45. if Path(schema_name).exists():
  46. return json_load(schema_name)
  47. schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
  48. if not schema_path.exists():
  49. schema_path = Path('data/schemas/false.jsonschema')
  50. return json_load(schema_path)
  51. @lru_cache(maxsize=1)
  52. def compile_schema_store():
  53. """Compile all our schemas into a schema store.
  54. """
  55. schema_store = {}
  56. for schema_file in Path('data/schemas').glob('*.jsonschema'):
  57. schema_data = load_jsonschema(schema_file)
  58. if not isinstance(schema_data, dict):
  59. cli.log.debug('Skipping schema file %s', schema_file)
  60. continue
  61. # `$id`-based references
  62. schema_store[schema_data['$id']] = schema_data
  63. # Path-based references
  64. schema_store[Path(schema_file).name] = schema_data
  65. return schema_store
  66. @lru_cache(maxsize=20)
  67. def create_validator(schema):
  68. """Creates a validator for the given schema id.
  69. """
  70. schema_store = compile_schema_store()
  71. resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
  72. return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
  73. def validate(data, schema):
  74. """Validates data against a schema.
  75. """
  76. validator = create_validator(schema)
  77. return validator(data)
  78. def deep_update(origdict, newdict):
  79. """Update a dictionary in place, recursing to do a depth-first deep copy.
  80. """
  81. for key, value in newdict.items():
  82. if isinstance(value, Mapping):
  83. origdict[key] = deep_update(origdict.get(key, {}), value)
  84. else:
  85. origdict[key] = value
  86. return origdict
  87. def merge_ordered_dicts(dicts):
  88. """Merges nested OrderedDict objects resulting from reading a hjson file.
  89. Later input dicts overrides earlier dicts for plain values.
  90. If any value is "!delete!", the existing value will be removed from its parent.
  91. Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
  92. Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
  93. """
  94. result = OrderedDict()
  95. def add_entry(target, k, v):
  96. if k in target and isinstance(v, (OrderedDict, dict)):
  97. if "!reset!" in v:
  98. target[k] = v
  99. else:
  100. target[k] = merge_ordered_dicts([target[k], v])
  101. if "!reset!" in target[k]:
  102. del target[k]["!reset!"]
  103. elif k in target and isinstance(v, list):
  104. if v[0] == '!reset!':
  105. target[k] = v[1:]
  106. else:
  107. target[k] = target[k] + v
  108. elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
  109. del target[k]
  110. else:
  111. target[k] = v
  112. for d in dicts:
  113. for (k, v) in d.items():
  114. add_entry(result, k, v)
  115. return result