Source code for nightingale.mapper
import logging
import sqlite3
import time
from typing import Any
import dict_hash
from nightingale.codelists import CodelistsMapping
from nightingale.config import Config
from nightingale.mapping_template.v09 import MappingTemplate
from nightingale.mapping_template.validator import MappingTemplateValidator
from nightingale.util import (
get_iso_now,
group_contiguous_mappings,
is_new_array,
remove_dicts_without_id,
sort_group_by_parent_and_id,
)
from nightingale.writer import DataWriter
logger = logging.getLogger(__name__)
#: Log a progress message every this many rows, and report releases that meet or exceed this row count.
LARGE_RELEASE_ROW_THRESHOLD = 500_000
#: Report releases that take longer than this many seconds to process.
SLOW_RELEASE_SECONDS = 18_000 # 5 hours
[docs]
class OCDSDataMapper:
"""
Maps data from a source to the OCDS format.
:param config: Configuration object containing settings for the mapper.
:type config: Config
"""
def __init__(self, config: Config, writer: DataWriter | None = None):
"""
Initialize the OCDSDataMapper.
:param config: Configuration object containing settings for the mapper.
:type config: Config
:param writer: Optional DataWriter instance for streaming output.
"""
self.config = config
self.mapping = MappingTemplate(config.mapping)
self.writer = writer
self.codelists = None
if self.config.mapping.codelists:
self.codelists = CodelistsMapping(self.config.mapping)
self.milestone_lookup = {}
[docs]
def produce_ocid(self, value: str) -> str:
"""
Produce an OCID based on the given value.
:param value: The value to use for generating the OCID.
:type value: str
:return: The produced OCID.
:rtype: str
"""
return f"{self.config.mapping.ocid_prefix}-{value}"
[docs]
def map(self, loader: Any, *, validate_mapping: bool = False) -> list[dict[str, Any]]:
"""
Map data from the loader to the OCDS format.
:param loader: Data loader object.
:type loader: Any
:return: List of mapped release dictionaries.
:rtype: list[dict[str, Any]]
"""
config = self.config.mapping
if config.milestone_lookup_sql:
try:
self.milestone_lookup = {row["code"]: row for row in loader.load(config.milestone_lookup_sql)}
logger.info("Loaded %d entries from milestone lookup.", len(self.milestone_lookup))
except sqlite3.OperationalError as e:
logger.warning("Could not load milestone lookup table: %s.", e)
if config.split_milestone_codes and not self.milestone_lookup:
logger.warning("split_milestone_codes is enabled but no milestone lookup data is available.")
logger.info("Mapping data loaded")
data = loader.load(config.selector)
logger.info("Start fetching rows from datasource")
if validate_mapping:
logger.info("Validating mapping template...")
validator = MappingTemplateValidator(loader, self.mapping)
validator.validate_data_elements()
validator.validate_selector(data[0])
logger.info("Start mapping data")
return self.transform_data(data, self.mapping, codelists=self.codelists)
[docs]
def transform_data(
self, data: list[dict[Any, Any]], mapping: MappingTemplate, codelists: CodelistsMapping | None = None
) -> list[dict[str, Any]]:
"""
Transform the input data to the OCDS format.
:param data: List of input data dictionaries.
:param mapping: Mapping configuration object.
:return: List of transformed release dictionaries.
"""
curr_ocid = ""
curr_release = {}
curr_release_dates = set()
array_counters = {}
mapped = []
count = 0
ocids = 0
large_ocids = {}
slow_ocids = {}
start_time_ocid = None
ocid_mapping = mapping.get_ocid_mapping()
for row in data:
ocid = row.get(ocid_mapping, "")
if not ocid:
logger.warning("No OCID found in row: %s. Skipping.", row)
continue
if not curr_ocid:
curr_ocid = ocid
start_time_ocid = time.time()
if curr_ocid != ocid:
logger.info("Finishing release with %d rows", count)
max_date = max(curr_release_dates) if curr_release_dates else None
self.finish_release(curr_ocid, curr_release, mapped, max_date)
duration = time.time() - start_time_ocid
minutes, seconds = divmod(int(duration), 60)
logger.info("Release mapped: %s in %dm %ds", self.produce_ocid(curr_ocid), minutes, seconds)
if count >= LARGE_RELEASE_ROW_THRESHOLD:
large_ocids[curr_ocid] = count
if duration > SLOW_RELEASE_SECONDS:
slow_ocids[curr_ocid] = round(duration / 60, 1)
logger.info("Start mapping: %s", ocid)
curr_ocid = ocid
curr_release = {}
array_counters = {}
curr_release_dates = set()
start_time_ocid = time.time()
ocids += 1
count = 0
curr_release = self.transform_row(
row,
mapping,
mapping.get_schema(),
curr_release,
array_counters=array_counters,
codelists=codelists,
curr_release_dates=curr_release_dates,
)
count += 1
if count % LARGE_RELEASE_ROW_THRESHOLD == 0:
logger.info("Processed %d rows", count)
if curr_release:
logger.info("Finishing release with %d rows", count)
max_date = max(curr_release_dates) if curr_release_dates else None
self.finish_release(curr_ocid, curr_release, mapped, max_date)
duration = time.time() - start_time_ocid
minutes, seconds = divmod(int(duration), 60)
logger.info("Release mapped: %s in %dm %ds", self.produce_ocid(curr_ocid), minutes, seconds)
if count >= LARGE_RELEASE_ROW_THRESHOLD:
large_ocids[curr_ocid] = count
if duration > SLOW_RELEASE_SECONDS:
slow_ocids[curr_ocid] = round(duration / 60, 1)
logger.info("Created %d unique releases", ocids)
logger.info("A really gigantic releases (> 500K rows): %s", large_ocids)
logger.info("Slow releases (>5 hours): %s", slow_ocids)
return mapped
[docs]
def finish_release(self, curr_ocid, curr_release, mapped, release_date):
curr_release = self.remove_empty_id_arrays(curr_release)
self.tag_initiation_type(curr_release)
self.date_release(curr_release, release_date)
self.tag_ocid(curr_release, curr_ocid)
self.generate_tags(curr_release)
self.make_release_id(curr_release)
if self.writer and self.writer.is_streaming():
self.writer.stream_release(curr_release)
else:
mapped.append(curr_release)
[docs]
def transform_row(
self,
input_data: dict[Any, Any],
mapping_config: MappingTemplate,
flattened_schema: dict[str, Any],
result: dict | None = None,
array_counters: dict | None = None,
codelists: CodelistsMapping | None = None,
curr_release_dates: set[str] | None = None,
) -> dict:
"""
Transform a single row of input data to the OCDS format.
:param input_data: Dictionary of input data.
:type input_data: dict[Any, Any]
:param mapping_config: Mapping configuration object.
:type mapping_config: MappingTemplate
:param flattened_schema: Flattened schema dictionary.
:type flattened_schema: dict[str, Any]
:param result: Existing result dictionary to update.
:type result: dict, optional
:return: Transformed row dictionary.
:rtype: dict
"""
def set_nested_value(nested_dict, keys, value, schema, *, add_new=False, append_once=False):
value = self.map_codelist_value(keys, schema, codelists, value)
last_key = keys[-1]
keys_path = "/" + "/".join(keys)
for i, key in enumerate(keys[:-1]):
subpath = "/" + "/".join(keys[: i + 1])
if isinstance(nested_dict, list):
nested_dict = self.shift_current_array(nested_dict, subpath, array_counters)
if key not in nested_dict:
nested_dict[key] = [] if schema.get(subpath, {}).get("type") == "array" else {}
nested_dict = nested_dict[key]
subpath = "/" + "/".join(keys[:-1])
if schema.get(keys_path, {}).get("type") == "array" and isinstance(nested_dict, list) and nested_dict:
nested_dict = self.shift_current_array(nested_dict, subpath, array_counters)
if isinstance(nested_dict, list):
if keys_path.startswith("/contracts/milestones"):
contract_id_map = mapping_config.get_mapping_for("/contracts/id")
if contract_id_map:
contract_id_col = contract_id_map[0]["mapping"]
contract_id = input_data.get(contract_id_col)
nested_dict = find_array_element_by_id(nested_dict, contract_id)
else:
nested_dict = self.shift_current_array(nested_dict, keys_path, array_counters)
else:
nested_dict = self.shift_current_array(nested_dict, keys_path, array_counters)
if add_new:
if last_key not in nested_dict:
nested_dict[last_key] = []
if not isinstance(value, dict) and append_once and value in nested_dict[last_key]:
return
nested_dict[last_key].append(value)
else:
nested_dict[last_key] = value
elif last_key in nested_dict:
if isinstance(nested_dict[last_key], list) and add_new:
if value in nested_dict[last_key] and append_once:
return
nested_dict[last_key].append(value)
elif isinstance(nested_dict[last_key], dict):
nested_dict[last_key].update(value)
else:
nested_dict[last_key] = value
else:
subpath = "/" + "/".join(keys)
if schema.get(subpath, {}).get("type") == "array" and not isinstance(value, list):
value = [value]
nested_dict[last_key] = value
if not result:
result = {}
datetime_paths = self.mapping.get_datetime_fields()
contract_milestones_processed_for_this_row = False
skip_criteria_processing = False
if result and result.get("tender", {}).get("selectionCriteria", {}).get("criteria", None):
skip_criteria_processing = True
contiguous_groups = group_contiguous_mappings(self.mapping.get_mappings())
for _block, group in contiguous_groups:
sorted_group = sort_group_by_parent_and_id(group)
for mapping in sorted_group:
flat_col = mapping["mapping"]
if not flat_col:
continue
value = input_data.get(flat_col)
if not value:
continue
path = mapping["path"]
if contract_milestones_processed_for_this_row and path.startswith("/contracts/milestones/"):
continue
if self.config.mapping.split_milestone_codes and path in {
"/contracts/milestones/id",
"/contracts/milestones/type",
"/contracts/milestones/status",
}:
code_mapping = mapping_config.get_mapping_for("/contracts/milestones/code")
if code_mapping:
code_col = code_mapping[0]["mapping"]
code_val = input_data.get(code_col)
if isinstance(code_val, str) and " " in code_val:
continue
if (
self.config.mapping.split_milestone_codes
and path == "/contracts/milestones/code"
and isinstance(value, str)
and " " in value
):
current_contract_id = None
contract_id_map = mapping_config.get_mapping_for("/contracts/id")
if contract_id_map:
contract_id_col = contract_id_map[0]["mapping"]
current_contract_id = input_data.get(contract_id_col)
if current_contract_id:
found_contract = None
for contract in result.get("contracts", []):
if contract.get("id") == current_contract_id:
found_contract = contract
break
known_codes = set(self.milestone_lookup)
if (
found_contract
and found_contract.get("milestones")
and known_codes
and any(m.get("code") in known_codes for m in found_contract["milestones"])
):
contract_milestones_processed_for_this_row = True
continue
codes = value.split()
if not codes:
continue
base_id_map = mapping_config.get_mapping_for("/contracts/milestones/id")
type_map = mapping_config.get_mapping_for("/contracts/milestones/type")
status_map = mapping_config.get_mapping_for("/contracts/milestones/status")
base_id_col = base_id_map[0]["mapping"] if base_id_map else None
type_col = type_map[0]["mapping"] if type_map else None
status_col = status_map[0]["mapping"] if status_map else None
base_id = input_data.get(base_id_col, "") if base_id_col else ""
m_type = input_data.get(type_col) if type_col else None
m_status = input_data.get(status_col) if status_col else None
for code in codes:
title = None
description = None
if lookup_data := self.milestone_lookup.get(code):
title = lookup_data.get("title")
description = lookup_data.get("description")
milestone_obj = {
"id": f"{base_id}-{code}" if base_id else code,
"title": title,
"type": m_type,
"description": description,
"code": code,
"status": m_status,
}
milestone_obj = {k: v for k, v in milestone_obj.items() if v is not None}
milestone_keys = path.strip("/").split("/")[:-1] # ['contracts', 'milestones']
set_nested_value(result, milestone_keys, milestone_obj, flattened_schema, add_new=True)
contract_milestones_processed_for_this_row = True
continue
if path.startswith("/contracts/milestones/") and contract_milestones_processed_for_this_row:
continue
if "/tender/selectionCriteria/criteria" in path and skip_criteria_processing:
continue
if path in datetime_paths and value:
curr_release_dates.add(value)
keys = path.strip("/").split("/")
if array_path := mapping_config.get_containing_array_path(path):
child_path = path[len(array_path) :]
last_key_name = keys[-1]
array_value = value
if path == array_path:
if "criteria" in path:
tender = result.get("tender", {})
selection = tender.get("selectionCriteria")
if selection:
criteria = selection.get("criteria", [])
if not criteria:
criteria.append({last_key_name: [value]})
else:
for criterion in criteria:
if last_key_name not in criterion:
criterion[last_key_name] = [value]
break
continue
set_nested_value(result, keys, value, flattened_schema, add_new=True, append_once=True)
continue
if "criteria" in path:
if (
child_path != "criteria"
and result.get("tender")
and result["tender"].get("selectionCriteria", None)
and (
len(result["tender"]["selectionCriteria"]["criteria"]) == 0
or last_key_name in result["tender"]["selectionCriteria"]["criteria"][-1]
)
):
result["tender"]["selectionCriteria"]["criteria"].append({})
elif array_path in array_counters:
if add_new := is_new_array(array_counters, child_path, last_key_name, array_value, array_path):
array_counters[array_path] = array_value
set_nested_value(result, keys[:-1], {}, flattened_schema, add_new=add_new)
elif last_key_name == "id":
array_counters[array_path] = array_value
set_nested_value(result, keys[:-1], {}, flattened_schema, add_new=True)
current: Any = result
for i, key in enumerate(keys[:-1]):
current_path = "/" + "/".join(keys[: i + 1])
is_array = flattened_schema.get(current_path, {}).get("type") == "array"
if key not in current:
current[key] = [] if is_array else {}
current = current[key]
if is_array:
if "criteria" in path:
if child_path != "criteria" and result["tender"].get("selectionCriteria", None):
for index, criterion in enumerate(
result["tender"]["selectionCriteria"]["criteria"]
):
if last_key_name not in criterion or len(criterion) == 0:
if last_key_name != "minimum":
current = result["tender"]["selectionCriteria"]["criteria"][index]
break
if len(criterion) == 0 or criterion.get("type", "") == "economic":
current = result["tender"]["selectionCriteria"]["criteria"][index]
break
else:
current = self.shift_current_array(current, current_path, array_counters)
value = self.map_codelist_value(keys, flattened_schema, codelists, value)
if isinstance(current, list):
array_path = mapping_config.get_containing_array_path("/" + "/".join(keys))
current = self.shift_current_array(current, array_path, array_counters)
current[last_key_name] = value
else:
set_nested_value(result, keys, value, flattened_schema)
return result
[docs]
def shift_current_array(self, current, array_path, array_counters):
if not current:
current.append({})
return find_array_element_by_id(current, array_counters.get(array_path) if array_counters else None)
[docs]
def make_release_id(self, curr_row: dict) -> None:
"""
Generate and set a unique ID for the release based on its content.
:param curr_row: The current release row dictionary.
:type curr_row: dict
"""
curr_row["id"] = dict_hash.sha256(curr_row)
[docs]
def date_release(self, curr_row: dict, curr_date: str | None) -> None:
"""
Set the release date to the current date and time.
:param curr_row: The current release row dictionary.
:type curr_row: dict
"""
curr_row["date"] = curr_date or get_iso_now()
[docs]
def tag_initiation_type(self, curr_row: dict) -> None:
"""
Tag the initiation type of the release as 'tender' if applicable.
:param curr_row: The current release row dictionary.
:type curr_row: dict
"""
if "tender" in curr_row and "initiationType" not in curr_row:
curr_row["initiationType"] = "tender"
[docs]
def tag_ocid(self, curr_row: dict, curr_ocid: str) -> None:
"""
Set the OCID for the release.
:param curr_row: The current release row dictionary.
:type curr_row: dict
:param curr_ocid: The OCID value to set.
:type curr_ocid: str
"""
curr_row["ocid"] = self.produce_ocid(curr_ocid)
[docs]
def generate_tags(self, release_data) -> None:
"""
Generate the release tag(s) based on the current release data, without considering prior releases.
Exclude 'update' tags, 'cancellation' tags and the 'compiled' tag.
:param release_data: The current release data (dict).
:return: A list of tags (list of str).
"""
release_data["tag"] = []
if release_data.get("planning"):
release_data["tag"].append("planning")
if release_data.get("tender"):
release_data["tag"].append("tender")
if "amendments" in release_data["tender"] and release_data["tender"]["amendments"]:
release_data["tag"].append("tenderAmendment")
if release_data.get("awards"):
release_data["tag"].append("award")
implementation_present = False
if release_data.get("contracts"):
release_data["tag"].append("contract")
if any("amendments" in contract and contract["amendments"] for contract in release_data["contracts"]):
release_data["tag"].append("contractAmendment")
for contract in release_data["contracts"]:
if contract.get("implementation"):
implementation_present = True
break
if implementation_present:
release_data["tag"].append("implementation")
[docs]
def remove_empty_id_arrays(self, data: Any) -> Any:
"""
Recursively remove arrays that do not contain an 'id' field.
:param data: The data dictionary to process.
:type data: dict[str, Any]
"""
return remove_dicts_without_id(data)
[docs]
def map_codelist_value(self, keys, schema, codelists, value):
path = "/" + "/".join(keys)
if codelist := schema.get(path, {}).get("codelist"):
codelist = codelists.get_mapping_for_codelist(codelist)
if codelist:
if new_value := codelist.get(value):
return new_value
if path not in self.config.mapping.codelist_passthrough_paths:
return "" # discard
return value
[docs]
def find_array_element_by_id(current, array_element_id):
"""
Find the first dictionary in a list that contains the given 'id' value.
If no dictionary with the matching 'id' is found, return the last dictionary in the list.
:param current: List[Dict], a list of dictionaries to search.
:param array_element_id: Any, the target 'id' value to search for.
:return: Dict, the dictionary with the matching 'id' value, or the last dictionary if not found.
Examples:
>>> dict_list = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}, {'id': 3, 'name': 'Charlie'}]
>>> find_array_element_by_id(dict_list, 2)
{'id': 2, 'name': 'Bob'}
>>> find_array_element_by_id(dict_list, 4)
{'id': 3, 'name': 'Charlie'}
>>> find_array_element_by_id(dict_list, 3)
{'id': 3, 'name': 'Charlie'}
>>> find_array_element_by_id([], 1) is None
True
"""
for item in current:
if item.get("id") == array_element_id:
return item
return current[-1] if current else None