Source code for nightingale.util
from datetime import UTC, datetime
[docs]
def produce_package_name(date) -> str:
return f"release-package-{date}.json"
[docs]
def remove_dicts_without_id(data):
if isinstance(data, dict):
result = {}
for k, val in data.items():
if cleaned_v := remove_dicts_without_id(val):
result[k] = cleaned_v
return result
if isinstance(data, list):
return [
remove_dicts_without_id(item)
for item in data
if not (isinstance(item, dict) and "id" not in item)
or (isinstance(item, dict) and item.get("verificationMethod"))
]
return data
[docs]
def get_iso_now():
return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")
[docs]
def is_new_array(array_counters: dict, child_path: str, array_key: str, array_value: str, array_path: str) -> bool:
"""
Check if a new array should be created based on the given parameters.
:param array_counters: Dictionary keeping track of array counters.
:param child_path: The child path in the schema.
:param array_key: The key in the array.
:param array_value: The value associated with the array key.
:param array_path: The path of the array.
:return: True if a new array should be created, False otherwise.
>>> array_counters = {'/object/field2/array_field': '1'}
>>> child_path = '/id'
>>> array_key = 'id'
>>> array_value = '2'
>>> array_path = '/object/field2/array_field'
>>> is_new_array(array_counters, child_path, array_key, array_value, array_path)
True
>>> array_counters = {'/object/field2/array_field': '1'}
>>> child_path = '/id'
>>> array_key = 'id'
>>> array_value = '1'
>>> array_path = '/object/field2/array_field'
>>> is_new_array(array_counters, child_path, array_key, array_value, array_path)
False
>>> array_counters = {'/object/field2/array_field': '1'}
>>> child_path = '/name'
>>> array_key = 'name'
>>> array_value = 'example'
>>> array_path = '/object/field2/array_field'
>>> is_new_array(array_counters, child_path, array_key, array_value, array_path)
False
"""
return bool(array_key == "id" and "/" + array_key == child_path and array_counters[array_path] != array_value)
[docs]
def get_longest_array_path(arrays, path): # extract for testing
for array in sorted(arrays, key=len, reverse=True):
if path.startswith(array):
return array
return None
[docs]
def group_contiguous_mappings(mapping_list: list[dict]) -> list[tuple[str, list[dict]]]:
"""Group mapping items by contiguous blocks: group consecutive items that share the same block."""
groups = []
if not mapping_list:
return groups
current_block = mapping_list[0]["block"]
current_group = [mapping_list[0]]
for item in mapping_list[1:]:
if item["block"] == current_block:
current_group.append(item)
else:
groups.append((current_block, current_group))
current_block = item["block"]
current_group = [item]
groups.append((current_block, current_group))
return groups
[docs]
def sort_group_by_parent_and_id(group: list[dict]) -> list[dict]:
"""
Sort a contiguous group of mapping items so that '/id' paths come first within each parent.
Split the group into subgroups that share the same parent (i.e. everything before the
final '/'). Then, for each subgroup, sort so that any item whose path ends with '/id'
comes first. The sorted subgroups are then concatenated in the original order.
"""
sorted_list = []
current_subgroup = []
current_parent = None
for item in group:
parent = item["path"].rsplit("/", 1)[0] if "/" in item["path"] else item["path"]
if current_parent is None:
current_parent = parent
if parent == current_parent:
current_subgroup.append(item)
else:
sorted_subgroup = sorted(current_subgroup, key=lambda x: 0 if x["path"].endswith("/id") else 1)
sorted_list.extend(sorted_subgroup)
current_subgroup = [item]
current_parent = parent
if current_subgroup:
sorted_subgroup = sorted(current_subgroup, key=lambda x: 0 if x["path"].endswith("/id") else 1)
sorted_list.extend(sorted_subgroup)
return sorted_list