Source code for nightingale.mapper

import logging
import sqlite3
import time
from typing import Any

import dict_hash

from nightingale.codelists import CodelistsMapping
from nightingale.config import Config
from nightingale.mapping_template.v09 import MappingTemplate
from nightingale.mapping_template.validator import MappingTemplateValidator
from nightingale.util import (
    get_iso_now,
    group_contiguous_mappings,
    is_new_array,
    remove_dicts_without_id,
    sort_group_by_parent_and_id,
)
from nightingale.writer import DataWriter

logger = logging.getLogger(__name__)

#: Log a progress message every this many rows, and report releases that meet or exceed this row count.
LARGE_RELEASE_ROW_THRESHOLD = 500_000
#: Report releases that take longer than this many seconds to process.
SLOW_RELEASE_SECONDS = 18_000  # 5 hours


[docs] class OCDSDataMapper: """ Maps data from a source to the OCDS format. :param config: Configuration object containing settings for the mapper. :type config: Config """ def __init__(self, config: Config, writer: DataWriter | None = None): """ Initialize the OCDSDataMapper. :param config: Configuration object containing settings for the mapper. :type config: Config :param writer: Optional DataWriter instance for streaming output. """ self.config = config self.mapping = MappingTemplate(config.mapping) self.writer = writer self.codelists = None if self.config.mapping.codelists: self.codelists = CodelistsMapping(self.config.mapping) self.milestone_lookup = {}
[docs] def produce_ocid(self, value: str) -> str: """ Produce an OCID based on the given value. :param value: The value to use for generating the OCID. :type value: str :return: The produced OCID. :rtype: str """ return f"{self.config.mapping.ocid_prefix}-{value}"
[docs] def map(self, loader: Any, *, validate_mapping: bool = False) -> list[dict[str, Any]]: """ Map data from the loader to the OCDS format. :param loader: Data loader object. :type loader: Any :return: List of mapped release dictionaries. :rtype: list[dict[str, Any]] """ config = self.config.mapping if config.milestone_lookup_sql: try: self.milestone_lookup = {row["code"]: row for row in loader.load(config.milestone_lookup_sql)} logger.info("Loaded %d entries from milestone lookup.", len(self.milestone_lookup)) except sqlite3.OperationalError as e: logger.warning("Could not load milestone lookup table: %s.", e) if config.split_milestone_codes and not self.milestone_lookup: logger.warning("split_milestone_codes is enabled but no milestone lookup data is available.") logger.info("Mapping data loaded") data = loader.load(config.selector) logger.info("Start fetching rows from datasource") if validate_mapping: logger.info("Validating mapping template...") validator = MappingTemplateValidator(loader, self.mapping) validator.validate_data_elements() validator.validate_selector(data[0]) logger.info("Start mapping data") return self.transform_data(data, self.mapping, codelists=self.codelists)
[docs] def transform_data( self, data: list[dict[Any, Any]], mapping: MappingTemplate, codelists: CodelistsMapping | None = None ) -> list[dict[str, Any]]: """ Transform the input data to the OCDS format. :param data: List of input data dictionaries. :param mapping: Mapping configuration object. :return: List of transformed release dictionaries. """ curr_ocid = "" curr_release = {} curr_release_dates = set() array_counters = {} mapped = [] count = 0 ocids = 0 large_ocids = {} slow_ocids = {} start_time_ocid = None ocid_mapping = mapping.get_ocid_mapping() for row in data: ocid = row.get(ocid_mapping, "") if not ocid: logger.warning("No OCID found in row: %s. Skipping.", row) continue if not curr_ocid: curr_ocid = ocid start_time_ocid = time.time() if curr_ocid != ocid: logger.info("Finishing release with %d rows", count) max_date = max(curr_release_dates) if curr_release_dates else None self.finish_release(curr_ocid, curr_release, mapped, max_date) duration = time.time() - start_time_ocid minutes, seconds = divmod(int(duration), 60) logger.info("Release mapped: %s in %dm %ds", self.produce_ocid(curr_ocid), minutes, seconds) if count >= LARGE_RELEASE_ROW_THRESHOLD: large_ocids[curr_ocid] = count if duration > SLOW_RELEASE_SECONDS: slow_ocids[curr_ocid] = round(duration / 60, 1) logger.info("Start mapping: %s", ocid) curr_ocid = ocid curr_release = {} array_counters = {} curr_release_dates = set() start_time_ocid = time.time() ocids += 1 count = 0 curr_release = self.transform_row( row, mapping, mapping.get_schema(), curr_release, array_counters=array_counters, codelists=codelists, curr_release_dates=curr_release_dates, ) count += 1 if count % LARGE_RELEASE_ROW_THRESHOLD == 0: logger.info("Processed %d rows", count) if curr_release: logger.info("Finishing release with %d rows", count) max_date = max(curr_release_dates) if curr_release_dates else None self.finish_release(curr_ocid, curr_release, mapped, max_date) duration = time.time() - start_time_ocid minutes, seconds = divmod(int(duration), 60) logger.info("Release mapped: %s in %dm %ds", self.produce_ocid(curr_ocid), minutes, seconds) if count >= LARGE_RELEASE_ROW_THRESHOLD: large_ocids[curr_ocid] = count if duration > SLOW_RELEASE_SECONDS: slow_ocids[curr_ocid] = round(duration / 60, 1) logger.info("Created %d unique releases", ocids) logger.info("A really gigantic releases (> 500K rows): %s", large_ocids) logger.info("Slow releases (>5 hours): %s", slow_ocids) return mapped
[docs] def finish_release(self, curr_ocid, curr_release, mapped, release_date): curr_release = self.remove_empty_id_arrays(curr_release) self.tag_initiation_type(curr_release) self.date_release(curr_release, release_date) self.tag_ocid(curr_release, curr_ocid) self.generate_tags(curr_release) self.make_release_id(curr_release) if self.writer and self.writer.is_streaming(): self.writer.stream_release(curr_release) else: mapped.append(curr_release)
[docs] def transform_row( self, input_data: dict[Any, Any], mapping_config: MappingTemplate, flattened_schema: dict[str, Any], result: dict | None = None, array_counters: dict | None = None, codelists: CodelistsMapping | None = None, curr_release_dates: set[str] | None = None, ) -> dict: """ Transform a single row of input data to the OCDS format. :param input_data: Dictionary of input data. :type input_data: dict[Any, Any] :param mapping_config: Mapping configuration object. :type mapping_config: MappingTemplate :param flattened_schema: Flattened schema dictionary. :type flattened_schema: dict[str, Any] :param result: Existing result dictionary to update. :type result: dict, optional :return: Transformed row dictionary. :rtype: dict """ def set_nested_value(nested_dict, keys, value, schema, *, add_new=False, append_once=False): value = self.map_codelist_value(keys, schema, codelists, value) last_key = keys[-1] keys_path = "/" + "/".join(keys) for i, key in enumerate(keys[:-1]): subpath = "/" + "/".join(keys[: i + 1]) if isinstance(nested_dict, list): nested_dict = self.shift_current_array(nested_dict, subpath, array_counters) if key not in nested_dict: nested_dict[key] = [] if schema.get(subpath, {}).get("type") == "array" else {} nested_dict = nested_dict[key] subpath = "/" + "/".join(keys[:-1]) if schema.get(keys_path, {}).get("type") == "array" and isinstance(nested_dict, list) and nested_dict: nested_dict = self.shift_current_array(nested_dict, subpath, array_counters) if isinstance(nested_dict, list): if keys_path.startswith("/contracts/milestones"): contract_id_map = mapping_config.get_mapping_for("/contracts/id") if contract_id_map: contract_id_col = contract_id_map[0]["mapping"] contract_id = input_data.get(contract_id_col) nested_dict = find_array_element_by_id(nested_dict, contract_id) else: nested_dict = self.shift_current_array(nested_dict, keys_path, array_counters) else: nested_dict = self.shift_current_array(nested_dict, keys_path, array_counters) if add_new: if last_key not in nested_dict: nested_dict[last_key] = [] if not isinstance(value, dict) and append_once and value in nested_dict[last_key]: return nested_dict[last_key].append(value) else: nested_dict[last_key] = value elif last_key in nested_dict: if isinstance(nested_dict[last_key], list) and add_new: if value in nested_dict[last_key] and append_once: return nested_dict[last_key].append(value) elif isinstance(nested_dict[last_key], dict): nested_dict[last_key].update(value) else: nested_dict[last_key] = value else: subpath = "/" + "/".join(keys) if schema.get(subpath, {}).get("type") == "array" and not isinstance(value, list): value = [value] nested_dict[last_key] = value if not result: result = {} datetime_paths = self.mapping.get_datetime_fields() contract_milestones_processed_for_this_row = False skip_criteria_processing = False if result and result.get("tender", {}).get("selectionCriteria", {}).get("criteria", None): skip_criteria_processing = True contiguous_groups = group_contiguous_mappings(self.mapping.get_mappings()) for _block, group in contiguous_groups: sorted_group = sort_group_by_parent_and_id(group) for mapping in sorted_group: flat_col = mapping["mapping"] if not flat_col: continue value = input_data.get(flat_col) if not value: continue path = mapping["path"] if contract_milestones_processed_for_this_row and path.startswith("/contracts/milestones/"): continue if self.config.mapping.split_milestone_codes and path in { "/contracts/milestones/id", "/contracts/milestones/type", "/contracts/milestones/status", }: code_mapping = mapping_config.get_mapping_for("/contracts/milestones/code") if code_mapping: code_col = code_mapping[0]["mapping"] code_val = input_data.get(code_col) if isinstance(code_val, str) and " " in code_val: continue if ( self.config.mapping.split_milestone_codes and path == "/contracts/milestones/code" and isinstance(value, str) and " " in value ): current_contract_id = None contract_id_map = mapping_config.get_mapping_for("/contracts/id") if contract_id_map: contract_id_col = contract_id_map[0]["mapping"] current_contract_id = input_data.get(contract_id_col) if current_contract_id: found_contract = None for contract in result.get("contracts", []): if contract.get("id") == current_contract_id: found_contract = contract break known_codes = set(self.milestone_lookup) if ( found_contract and found_contract.get("milestones") and known_codes and any(m.get("code") in known_codes for m in found_contract["milestones"]) ): contract_milestones_processed_for_this_row = True continue codes = value.split() if not codes: continue base_id_map = mapping_config.get_mapping_for("/contracts/milestones/id") type_map = mapping_config.get_mapping_for("/contracts/milestones/type") status_map = mapping_config.get_mapping_for("/contracts/milestones/status") base_id_col = base_id_map[0]["mapping"] if base_id_map else None type_col = type_map[0]["mapping"] if type_map else None status_col = status_map[0]["mapping"] if status_map else None base_id = input_data.get(base_id_col, "") if base_id_col else "" m_type = input_data.get(type_col) if type_col else None m_status = input_data.get(status_col) if status_col else None for code in codes: title = None description = None if lookup_data := self.milestone_lookup.get(code): title = lookup_data.get("title") description = lookup_data.get("description") milestone_obj = { "id": f"{base_id}-{code}" if base_id else code, "title": title, "type": m_type, "description": description, "code": code, "status": m_status, } milestone_obj = {k: v for k, v in milestone_obj.items() if v is not None} milestone_keys = path.strip("/").split("/")[:-1] # ['contracts', 'milestones'] set_nested_value(result, milestone_keys, milestone_obj, flattened_schema, add_new=True) contract_milestones_processed_for_this_row = True continue if path.startswith("/contracts/milestones/") and contract_milestones_processed_for_this_row: continue if "/tender/selectionCriteria/criteria" in path and skip_criteria_processing: continue if path in datetime_paths and value: curr_release_dates.add(value) keys = path.strip("/").split("/") if array_path := mapping_config.get_containing_array_path(path): child_path = path[len(array_path) :] last_key_name = keys[-1] array_value = value if path == array_path: if "criteria" in path: tender = result.get("tender", {}) selection = tender.get("selectionCriteria") if selection: criteria = selection.get("criteria", []) if not criteria: criteria.append({last_key_name: [value]}) else: for criterion in criteria: if last_key_name not in criterion: criterion[last_key_name] = [value] break continue set_nested_value(result, keys, value, flattened_schema, add_new=True, append_once=True) continue if "criteria" in path: if ( child_path != "criteria" and result.get("tender") and result["tender"].get("selectionCriteria", None) and ( len(result["tender"]["selectionCriteria"]["criteria"]) == 0 or last_key_name in result["tender"]["selectionCriteria"]["criteria"][-1] ) ): result["tender"]["selectionCriteria"]["criteria"].append({}) elif array_path in array_counters: if add_new := is_new_array(array_counters, child_path, last_key_name, array_value, array_path): array_counters[array_path] = array_value set_nested_value(result, keys[:-1], {}, flattened_schema, add_new=add_new) elif last_key_name == "id": array_counters[array_path] = array_value set_nested_value(result, keys[:-1], {}, flattened_schema, add_new=True) current: Any = result for i, key in enumerate(keys[:-1]): current_path = "/" + "/".join(keys[: i + 1]) is_array = flattened_schema.get(current_path, {}).get("type") == "array" if key not in current: current[key] = [] if is_array else {} current = current[key] if is_array: if "criteria" in path: if child_path != "criteria" and result["tender"].get("selectionCriteria", None): for index, criterion in enumerate( result["tender"]["selectionCriteria"]["criteria"] ): if last_key_name not in criterion or len(criterion) == 0: if last_key_name != "minimum": current = result["tender"]["selectionCriteria"]["criteria"][index] break if len(criterion) == 0 or criterion.get("type", "") == "economic": current = result["tender"]["selectionCriteria"]["criteria"][index] break else: current = self.shift_current_array(current, current_path, array_counters) value = self.map_codelist_value(keys, flattened_schema, codelists, value) if isinstance(current, list): array_path = mapping_config.get_containing_array_path("/" + "/".join(keys)) current = self.shift_current_array(current, array_path, array_counters) current[last_key_name] = value else: set_nested_value(result, keys, value, flattened_schema) return result
[docs] def shift_current_array(self, current, array_path, array_counters): if not current: current.append({}) return find_array_element_by_id(current, array_counters.get(array_path) if array_counters else None)
[docs] def make_release_id(self, curr_row: dict) -> None: """ Generate and set a unique ID for the release based on its content. :param curr_row: The current release row dictionary. :type curr_row: dict """ curr_row["id"] = dict_hash.sha256(curr_row)
[docs] def date_release(self, curr_row: dict, curr_date: str | None) -> None: """ Set the release date to the current date and time. :param curr_row: The current release row dictionary. :type curr_row: dict """ curr_row["date"] = curr_date or get_iso_now()
[docs] def tag_initiation_type(self, curr_row: dict) -> None: """ Tag the initiation type of the release as 'tender' if applicable. :param curr_row: The current release row dictionary. :type curr_row: dict """ if "tender" in curr_row and "initiationType" not in curr_row: curr_row["initiationType"] = "tender"
[docs] def tag_ocid(self, curr_row: dict, curr_ocid: str) -> None: """ Set the OCID for the release. :param curr_row: The current release row dictionary. :type curr_row: dict :param curr_ocid: The OCID value to set. :type curr_ocid: str """ curr_row["ocid"] = self.produce_ocid(curr_ocid)
[docs] def generate_tags(self, release_data) -> None: """ Generate the release tag(s) based on the current release data, without considering prior releases. Exclude 'update' tags, 'cancellation' tags and the 'compiled' tag. :param release_data: The current release data (dict). :return: A list of tags (list of str). """ release_data["tag"] = [] if release_data.get("planning"): release_data["tag"].append("planning") if release_data.get("tender"): release_data["tag"].append("tender") if "amendments" in release_data["tender"] and release_data["tender"]["amendments"]: release_data["tag"].append("tenderAmendment") if release_data.get("awards"): release_data["tag"].append("award") implementation_present = False if release_data.get("contracts"): release_data["tag"].append("contract") if any("amendments" in contract and contract["amendments"] for contract in release_data["contracts"]): release_data["tag"].append("contractAmendment") for contract in release_data["contracts"]: if contract.get("implementation"): implementation_present = True break if implementation_present: release_data["tag"].append("implementation")
[docs] def remove_empty_id_arrays(self, data: Any) -> Any: """ Recursively remove arrays that do not contain an 'id' field. :param data: The data dictionary to process. :type data: dict[str, Any] """ return remove_dicts_without_id(data)
[docs] def map_codelist_value(self, keys, schema, codelists, value): path = "/" + "/".join(keys) if codelist := schema.get(path, {}).get("codelist"): codelist = codelists.get_mapping_for_codelist(codelist) if codelist: if new_value := codelist.get(value): return new_value if path not in self.config.mapping.codelist_passthrough_paths: return "" # discard return value
[docs] def find_array_element_by_id(current, array_element_id): """ Find the first dictionary in a list that contains the given 'id' value. If no dictionary with the matching 'id' is found, return the last dictionary in the list. :param current: List[Dict], a list of dictionaries to search. :param array_element_id: Any, the target 'id' value to search for. :return: Dict, the dictionary with the matching 'id' value, or the last dictionary if not found. Examples: >>> dict_list = [{'id': 1, 'name': 'Alice'}, {'id': 2, 'name': 'Bob'}, {'id': 3, 'name': 'Charlie'}] >>> find_array_element_by_id(dict_list, 2) {'id': 2, 'name': 'Bob'} >>> find_array_element_by_id(dict_list, 4) {'id': 3, 'name': 'Charlie'} >>> find_array_element_by_id(dict_list, 3) {'id': 3, 'name': 'Charlie'} >>> find_array_element_by_id([], 1) is None True """ for item in current: if item.get("id") == array_element_id: return item return current[-1] if current else None