import logging
import openpyxl
from nightingale.util import get_longest_array_path
logger = logging.getLogger(__name__)
EXTENSIONS = "Extensions are additions"
ADDITONAL_FIELDS = "If you have additional information "
MAPPINGS_SHEETS = [
"(OCDS) 1. General (all stages)",
"(OCDS) 2. Planning",
"(OCDS) 3. Tender",
"(OCDS) 4. Award",
"(OCDS) 5. Contract",
"(OCDS) 6. Implementation",
]
DATA_SHEET = "2. Data Elements"
EXTENSIONS_SHEET = "3. OCDS Extensions"
SCHEMA_SHEET = "Schema"
[docs]
class MappingTemplate:
def __init__(self, config):
self.config = config
self.wb = openpyxl.load_workbook(self.config.file, data_only=True)
self.data_elements = self.read_data_elements_sheet(self.wb[DATA_SHEET])
mappings = self.read_mappings()
self.mappings = self.enforce_mapping_structure(mappings)
self.schema = self.read_schema_sheet()
self.extensions = self.read_extenions_info()
[docs]
def get_schema_sheet(self):
return [self.wb[sheet] for sheet in self.wb.sheetnames if "OCDS" in sheet and SCHEMA_SHEET in sheet]
[docs]
def normmalize_mapping_column(self, mappings):
"""Normalize the mapping column by setting all space separators to one space."""
for mapping in mappings:
if " " in mapping["mapping"]:
mapping["mapping"] = " ".join(p.strip() for p in mapping["mapping"].split(" "))
return mappings
[docs]
def read_mapping_sheet(self, sheet):
# Iterate over the rows, starting from the third row
in_extensions = False
current_extension = ""
current_block = ""
mappings = []
for row in sheet.iter_rows(min_row=4, values_only=True):
column_type = row[0]
path = row[2]
title = row[3]
description = row[4]
mapping = row[5]
match column_type:
case "span" | "ref_span" | "extension_span":
current_block = path
case "field" | "required_field" | "extension_field" | "additional_field":
if not mapping:
continue
mappings.append(
{
"block": current_block,
"path": path if path.startswith("/") else "/" + path,
"title": title,
"description": description,
"mapping": mapping,
"is_extensions": in_extensions,
"extension": current_extension,
"is_required": column_type == "required_field",
"is_additional": column_type == "additional_field",
}
)
case "subtitle" | "required_span":
continue
case "section":
if EXTENSIONS in path or ADDITONAL_FIELDS in path:
in_extensions = True
case "extension":
current_extension = path.split(":")[0]
return mappings
[docs]
def read_mappings(self):
mappings = []
for sheet_name in MAPPINGS_SHEETS:
sheet = self.wb[sheet_name]
mappings.extend(self.read_mapping_sheet(sheet))
return mappings
[docs]
def get_element_by_mapping(self, for_mapping):
element = for_mapping.split("(")[1].replace(")", "")
return self.data_elements.get(element, {})
[docs]
def read_data_elements_sheet(self, sheet):
elements = {}
for row in sheet.iter_rows(min_row=4, values_only=True):
for_mapping, data_source, table, data_element, publish, example, description, data_type, *_ = row
if not data_element:
continue
elements[data_element] = {
"for_mapping": str(for_mapping).replace(" ", " "),
"data_source": data_source,
"data_element": data_element,
"table": table,
"publish": "yes" in str(publish).lower(),
"example": example,
"description": description,
"data_type": data_type,
}
return elements
[docs]
def read_extenions_info(self):
if EXTENSIONS_SHEET not in self.wb.sheetnames:
return []
sheet = self.wb[EXTENSIONS_SHEET]
headers = [(i, cell.value) for i, cell in enumerate(sheet[1])] # Assuming the first row contains the headers
data = []
for row in sheet.iter_rows(min_row=2, values_only=True):
if not any(row):
continue
row_data = {k.lower(): row[i] for i, k in headers if k}
data.append(row_data)
return data
[docs]
def get_data_elements(self):
return self.data_elements
[docs]
def read_schema_sheet(self):
sheets = self.get_schema_sheet()
if sheets is None:
return {}
schema = {}
for sheet in sheets:
for row in sheet.iter_rows(min_row=2, values_only=True):
_, path, title, description, field_type, field_range, values, links, codelist, *_ = row
if not path:
continue
path = "/" + path
if path in schema and field_type == "object":
# this is a nested object inside arrray we are interested in parrent array path
continue
schema[path] = {
"title": title,
"description": description,
"type": "array" if field_type and "array" in field_type.lower() else field_type,
"range": field_range,
"values": values,
"links": links,
"codelist": codelist,
}
return schema
[docs]
def enforce_mapping_structure(self, mappings):
sections = {"planning": [], "tender": [], "awards": [], "contracts": [], "implementation": [], "general": []}
for mapping in mappings:
section = mapping["path"].split("/")[0]
if section in sections:
sections[section].append(mapping)
else:
sections["general"].append(mapping)
# Merge the lists in the specified order
return (
sections["general"]
+ sections["planning"]
+ sections["tender"]
+ sections["awards"]
+ sections["contracts"]
+ sections["implementation"]
)
[docs]
def get_mappings(self):
return self.mappings
[docs]
def get_mapping_for(self, path):
if not path.startswith("/"):
path = "/" + path
return [mapping for mapping in self.get_mappings() if mapping["path"] == path]
[docs]
def get_paths_for_mapping(self, key, *, force_publish=False):
result = []
for mapping in self.get_mappings():
if mapping["mapping"] == key:
element = self.get_element_by_mapping(key)
if element.get("publish", False) or force_publish:
result.append(mapping["path"])
return result
[docs]
def is_array_path(self, path):
return self.schema.get(path, {}).get("type") == "array"
[docs]
def get_arrays(self):
result = []
for path, schema in self.schema.items():
if schema["type"] == "array":
result.append(path)
return result
[docs]
def get_schema(self):
return self.schema
[docs]
def get_ocid_mapping(self):
ocid_mapping = self.get_mapping_for("ocid")[0]
return ocid_mapping["mapping"]
[docs]
def get_containing_array_path(self, path):
return get_longest_array_path(self.get_arrays(), path)
[docs]
def get_datetime_fields(self):
"""Return a list of paths that are marked as 'date-time' in the 'values' column in the schema."""
datetime_fields = []
for path, field_info in self.schema.items():
if field_info.get("values") == "date-time":
datetime_fields.append(path)
return datetime_fields