json_schema.py (4631B)
- """Functions that help us generate and use info.json files.
- """
- import json
- import hjson
- import jsonschema
- from collections.abc import Mapping
- from functools import lru_cache
- from typing import OrderedDict
- from pathlib import Path
- from copy import deepcopy
- from milc import cli
- from qmk.util import maybe_exit
- def _dict_raise_on_duplicates(ordered_pairs):
- """Reject duplicate keys."""
- d = {}
- for k, v in ordered_pairs:
- if k in d:
- raise ValueError("duplicate key: %r" % (k,))
- else:
- d[k] = v
- return d
- @lru_cache(maxsize=20)
- def _json_load_impl(json_file, strict=True):
- """Load a json file from disk.
- Note: file must be a Path object.
- """
- try:
- # Get the IO Stream for Path objects
- # Not necessary if the data is provided via stdin
- if isinstance(json_file, Path):
- json_file = json_file.open(encoding='utf-8')
- return hjson.load(json_file, object_pairs_hook=_dict_raise_on_duplicates if strict else None)
- except (json.decoder.JSONDecodeError, hjson.HjsonDecodeError) as e:
- cli.log.error('Invalid JSON encountered attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
- maybe_exit(1)
- except Exception as e:
- cli.log.error('Unknown error attempting to load {fg_cyan}%s{fg_reset}:\n\t{fg_red}%s', json_file, e)
- maybe_exit(1)
- def json_load(json_file, strict=True):
- return deepcopy(_json_load_impl(json_file=json_file, strict=strict))
- @lru_cache(maxsize=20)
- def load_jsonschema(schema_name):
- """Read a jsonschema file from disk.
- """
- if Path(schema_name).exists():
- return json_load(schema_name)
- schema_path = Path(f'data/schemas/{schema_name}.jsonschema')
- if not schema_path.exists():
- schema_path = Path('data/schemas/false.jsonschema')
- return json_load(schema_path)
- @lru_cache(maxsize=1)
- def compile_schema_store():
- """Compile all our schemas into a schema store.
- """
- schema_store = {}
- for schema_file in Path('data/schemas').glob('*.jsonschema'):
- schema_data = load_jsonschema(schema_file)
- if not isinstance(schema_data, dict):
- cli.log.debug('Skipping schema file %s', schema_file)
- continue
- # `$id`-based references
- schema_store[schema_data['$id']] = schema_data
- # Path-based references
- schema_store[Path(schema_file).name] = schema_data
- return schema_store
- @lru_cache(maxsize=20)
- def create_validator(schema):
- """Creates a validator for the given schema id.
- """
- schema_store = compile_schema_store()
- resolver = jsonschema.RefResolver.from_schema(schema_store[schema], store=schema_store)
- return jsonschema.Draft202012Validator(schema_store[schema], resolver=resolver).validate
- def validate(data, schema):
- """Validates data against a schema.
- """
- validator = create_validator(schema)
- return validator(data)
- def deep_update(origdict, newdict):
- """Update a dictionary in place, recursing to do a depth-first deep copy.
- """
- for key, value in newdict.items():
- if isinstance(value, Mapping):
- origdict[key] = deep_update(origdict.get(key, {}), value)
- else:
- origdict[key] = value
- return origdict
- def merge_ordered_dicts(dicts):
- """Merges nested OrderedDict objects resulting from reading a hjson file.
- Later input dicts overrides earlier dicts for plain values.
- If any value is "!delete!", the existing value will be removed from its parent.
- Arrays will be appended. If the first entry of an array is "!reset!", the contents of the array will be cleared and replaced with RHS.
- Dictionaries will be recursively merged. If any entry is "!reset!", the contents of the dictionary will be cleared and replaced with RHS.
- """
- result = OrderedDict()
- def add_entry(target, k, v):
- if k in target and isinstance(v, (OrderedDict, dict)):
- if "!reset!" in v:
- target[k] = v
- else:
- target[k] = merge_ordered_dicts([target[k], v])
- if "!reset!" in target[k]:
- del target[k]["!reset!"]
- elif k in target and isinstance(v, list):
- if v[0] == '!reset!':
- target[k] = v[1:]
- else:
- target[k] = target[k] + v
- elif v == "!delete!" and isinstance(target, (OrderedDict, dict)):
- del target[k]
- else:
- target[k] = v
- for d in dicts:
- for (k, v) in d.items():
- add_entry(result, k, v)
- return result