Source code for nightingale.writer
import os
from pathlib import Path
from typing import TYPE_CHECKING
import simplejson as json
from nightingale.config import Output
from nightingale.exceptions import StreamNotStartedError
from nightingale.util import get_iso_now, produce_package_name
if TYPE_CHECKING:
import io
[docs]
def new_name(package: dict | list) -> str:
"""
Generate a new name for the package based on its published date.
:param package: The release package dictionary.
:return: The generated package name.
"""
return produce_package_name(get_iso_now() if isinstance(package, list) else package["publishedDate"])
[docs]
class DataWriter:
"""Writes release package to disk."""
def __init__(self, config: Output):
"""
Initialize the DataWriter.
:param config: Configuration object containing settings for the writer.
"""
self.config = config
self._file_handler: io.TextIOWrapper | None = None
self._is_first_release = True
self._output_path = None
[docs]
def make_dirs(self) -> Path:
"""
Create the necessary directories for storing the release package.
:return: The base directory path.
"""
base = Path(self.config.directory)
base.mkdir(parents=True, exist_ok=True)
return base
[docs]
def get_output_path(self, package: dict | list) -> Path:
"""
Get the output path for the release package.
:param package: The release package dictionary.
:return: The path where the package will be written.
"""
if not self._output_path:
base = self.make_dirs()
self._output_path = base / new_name(package)
return self._output_path
[docs]
def write(self, package: dict | list) -> None:
"""
Write the release package to disk in a single operation.
:param package: The release package dictionary or list of releases.
"""
path = self.get_output_path(package)
with path.open("w", encoding="utf-8") as f:
json.dump(package, f, indent=2, ensure_ascii=False)
[docs]
def start_package_stream(self, package_metadata: dict) -> None:
"""Start a streaming write session, write package metadata and prepare for releases."""
buffer_size_str = os.getenv("APP_WRITE_BUFFER_SIZE", "8388608")
buffer_size = int(buffer_size_str)
path = self.get_output_path(package_metadata)
self._file_handler = path.open("w", encoding="utf-8", buffering=buffer_size)
# Write metadata part of the package
metadata_items = []
for key, value in package_metadata.items():
metadata_items.append(f'"{key}": {json.dumps(value, ensure_ascii=False, indent=2)}')
self._file_handler.write("{\n " + ",\n ".join(metadata_items) + ",\n")
self._file_handler.write(' "releases": [\n')
self._is_first_release = True
[docs]
def stream_release(self, release: dict) -> None:
"""Write a single release to the open package file stream."""
if not self._file_handler:
raise StreamNotStartedError
if not self._is_first_release:
self._file_handler.write(",\n")
# Using simplejson for potential performance gains and better decimal handling
json.dump(release, self._file_handler, indent=4, ensure_ascii=False)
self._is_first_release = False
self._file_handler.flush()
[docs]
def end_package_stream(self) -> None:
"""
Finalize the streaming write session by closing the JSON array and file.
This method is safe to call even if the stream was not started or already closed.
"""
if self._file_handler:
self._file_handler.write("\n ]\n}\n")
self._file_handler.close()
self._file_handler = None
[docs]
def is_streaming(self) -> bool:
"""Check if the writer is currently in a streaming session."""
return self._file_handler is not None and not self._file_handler.closed