Source code for gunz_cm.io.gnz

# -*- coding: utf-8 -*-
"""
Reader and Writer for the .gnz unified container format.


Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"

import json
import pathlib
import struct
import typing as t
import numpy as np

GNZ_MAGIC = b"GNZ1"
ALIGNMENT = 4096

[docs] class GnzWriter: """ Class GnzWriter. Parameters ---------- Returns ------- Examples -------- Notes ----- """ def __init__(self, fpath: t.Union[str, pathlib.Path], overwrite: bool = False): """ Function __init__. Parameters ---------- Returns ------- Examples -------- Notes ----- """ self.fpath = pathlib.Path(fpath) if self.fpath.exists() and not overwrite: raise FileExistsError(f"File exists: {self.fpath}") self.metadata = {} self.arrays_info = {} self._pending_arrays = {} # name -> (data, dtype) self._streaming_arrays = {} # name -> (shape, dtype, offset) self._file_handle = None
[docs] def set_metadata(self, meta: dict): """ Function set_metadata. Parameters ---------- Returns ------- Examples -------- Notes ----- """ self.metadata = meta
[docs] def add_array(self, name: str, data: np.ndarray, dtype: t.Optional[t.Union[str, np.dtype]] = None): """Register a complete array to be written. Data is not written until write() is called. Examples -------- """ self._pending_arrays[name] = (data, dtype or data.dtype)
[docs] def init_streaming_array(self, name: str, shape: t.Tuple[int, ...], dtype: t.Union[str, np.dtype]): """Reserve space for an array to be written incrementally. Examples -------- """ self._streaming_arrays[name] = (shape, np.dtype(dtype))
[docs] def write(self): """Finalize the header and layout, and open the file for writing. Examples -------- """ # 1. Calculate Layout current_offset = ALIGNMENT # Combine pending and streaming for layout calculation all_arrays = {} for name, (data, dtype) in self._pending_arrays.items(): all_arrays[name] = (data.shape, dtype) for name, (shape, dtype) in self._streaming_arrays.items(): all_arrays[name] = (shape, dtype) for name, (shape, dtype) in all_arrays.items(): dt = np.dtype(dtype) nbytes = int(np.prod(shape) * dt.itemsize) self.arrays_info[name] = { "offset": current_offset, "shape": shape, "dtype": dt.str, "order": "C" } # Update offset for next array (Aligned) next_offset = current_offset + nbytes padding = (ALIGNMENT - (next_offset % ALIGNMENT)) % ALIGNMENT current_offset = next_offset + padding # 2. Build Header full_header = { "version": 1, "metadata": self.metadata, "arrays": self.arrays_info } header_json = json.dumps(full_header, sort_keys=True).encode('utf-8') header_len = len(header_json) # Check if header fits in ALIGNMENT - MAGIC - LEN max_header_size = ALIGNMENT - 8 if header_len > max_header_size: n_pages = (header_len + 8 + ALIGNMENT - 1) // ALIGNMENT base_offset = n_pages * ALIGNMENT shift = base_offset - ALIGNMENT for info in self.arrays_info.values(): info["offset"] += shift full_header["arrays"] = self.arrays_info header_json = json.dumps(full_header, sort_keys=True).encode('utf-8') header_len = len(header_json) data_start = base_offset else: data_start = ALIGNMENT # 3. Create/Prepare File # We use r+ if it exists, but here we creating new with open(self.fpath, "wb") as f: f.write(GNZ_MAGIC) f.write(struct.pack("<I", header_len)) f.write(header_json) current_pos = f.tell() f.write(b'\x00' * (data_start - current_pos)) # Pre-allocate the entire file size to avoid fragmentation total_size = current_offset if header_len <= max_header_size else current_offset + (data_start - ALIGNMENT) f.truncate(total_size) # 4. Write Pending Arrays # We use memmap to write to sections for name, (data, dtype) in self._pending_arrays.items(): mm = self.get_array_writable(name) mm[:] = data.astype(dtype) mm.flush()
[docs] def get_array_writable(self, name: str) -> np.memmap: """Returns a writable memmap for a specific array in the container. Examples -------- """ if name not in self.arrays_info: raise KeyError(f"Array '{name}' not found. Call write() first to finalize layout.") info = self.arrays_info[name] return np.memmap( self.fpath, dtype=np.dtype(info["dtype"]), mode='r+', offset=info["offset"], shape=tuple(info["shape"]) )
[docs] class GnzReader: """ Class GnzReader. Parameters ---------- Returns ------- Examples -------- Notes ----- """ def __init__(self, fpath: t.Union[str, pathlib.Path]): """ Function __init__. Parameters ---------- Returns ------- Examples -------- Notes ----- """ self.fpath = pathlib.Path(fpath) if not self.fpath.exists(): raise FileNotFoundError(f"File not found: {self.fpath}") self._parse_header() def _parse_header(self): """ Function _parse_header. Parameters ---------- Returns ------- Examples -------- Notes ----- """ with open(self.fpath, "rb") as f: magic = f.read(4) if magic != GNZ_MAGIC: raise ValueError("Invalid GNZ file format") len_bytes = f.read(4) header_len = struct.unpack("<I", len_bytes)[0] json_bytes = f.read(header_len) self.header = json.loads(json_bytes.decode('utf-8')) self.metadata = self.header.get("metadata", {}) self.arrays_info = self.header.get("arrays", {})
[docs] def get_array(self, name: str, mode: str = "r") -> np.memmap: """ Function get_array. Parameters ---------- Returns ------- Examples -------- Notes ----- """ if name not in self.arrays_info: raise KeyError(f"Array '{name}' not found in GNZ file") info = self.arrays_info[name] offset = info["offset"] shape = tuple(info["shape"]) dtype = np.dtype(info["dtype"]) return np.memmap( self.fpath, dtype=dtype, mode=mode, offset=offset, shape=shape, order=info.get("order", "C") )
[docs] def get_metadata(self) -> dict: """ Function get_metadata. Parameters ---------- Returns ------- Examples -------- Notes ----- """ return self.metadata
[docs] def keys(self) -> t.List[str]: """ Function keys. Parameters ---------- Returns ------- Examples -------- Notes ----- """ return list(self.arrays_info.keys())