# -*- coding: utf-8 -*-
"""
Reader and Writer for the .gnz unified container format.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import json
import pathlib
import struct
import typing as t
import numpy as np
GNZ_MAGIC = b"GNZ1"
ALIGNMENT = 4096
[docs]
class GnzWriter:
"""
Class GnzWriter.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
def __init__(self, fpath: t.Union[str, pathlib.Path], overwrite: bool = False):
"""
Function __init__.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
self.fpath = pathlib.Path(fpath)
if self.fpath.exists() and not overwrite:
raise FileExistsError(f"File exists: {self.fpath}")
self.metadata = {}
self.arrays_info = {}
self._pending_arrays = {} # name -> (data, dtype)
self._streaming_arrays = {} # name -> (shape, dtype, offset)
self._file_handle = None
[docs]
def add_array(self, name: str, data: np.ndarray, dtype: t.Optional[t.Union[str, np.dtype]] = None):
"""Register a complete array to be written. Data is not written until write() is called.
Examples
--------
"""
self._pending_arrays[name] = (data, dtype or data.dtype)
[docs]
def init_streaming_array(self, name: str, shape: t.Tuple[int, ...], dtype: t.Union[str, np.dtype]):
"""Reserve space for an array to be written incrementally.
Examples
--------
"""
self._streaming_arrays[name] = (shape, np.dtype(dtype))
[docs]
def write(self):
"""Finalize the header and layout, and open the file for writing.
Examples
--------
"""
# 1. Calculate Layout
current_offset = ALIGNMENT
# Combine pending and streaming for layout calculation
all_arrays = {}
for name, (data, dtype) in self._pending_arrays.items():
all_arrays[name] = (data.shape, dtype)
for name, (shape, dtype) in self._streaming_arrays.items():
all_arrays[name] = (shape, dtype)
for name, (shape, dtype) in all_arrays.items():
dt = np.dtype(dtype)
nbytes = int(np.prod(shape) * dt.itemsize)
self.arrays_info[name] = {
"offset": current_offset,
"shape": shape,
"dtype": dt.str,
"order": "C"
}
# Update offset for next array (Aligned)
next_offset = current_offset + nbytes
padding = (ALIGNMENT - (next_offset % ALIGNMENT)) % ALIGNMENT
current_offset = next_offset + padding
# 2. Build Header
full_header = {
"version": 1,
"metadata": self.metadata,
"arrays": self.arrays_info
}
header_json = json.dumps(full_header, sort_keys=True).encode('utf-8')
header_len = len(header_json)
# Check if header fits in ALIGNMENT - MAGIC - LEN
max_header_size = ALIGNMENT - 8
if header_len > max_header_size:
n_pages = (header_len + 8 + ALIGNMENT - 1) // ALIGNMENT
base_offset = n_pages * ALIGNMENT
shift = base_offset - ALIGNMENT
for info in self.arrays_info.values():
info["offset"] += shift
full_header["arrays"] = self.arrays_info
header_json = json.dumps(full_header, sort_keys=True).encode('utf-8')
header_len = len(header_json)
data_start = base_offset
else:
data_start = ALIGNMENT
# 3. Create/Prepare File
# We use r+ if it exists, but here we creating new
with open(self.fpath, "wb") as f:
f.write(GNZ_MAGIC)
f.write(struct.pack("<I", header_len))
f.write(header_json)
current_pos = f.tell()
f.write(b'\x00' * (data_start - current_pos))
# Pre-allocate the entire file size to avoid fragmentation
total_size = current_offset if header_len <= max_header_size else current_offset + (data_start - ALIGNMENT)
f.truncate(total_size)
# 4. Write Pending Arrays
# We use memmap to write to sections
for name, (data, dtype) in self._pending_arrays.items():
mm = self.get_array_writable(name)
mm[:] = data.astype(dtype)
mm.flush()
[docs]
def get_array_writable(self, name: str) -> np.memmap:
"""Returns a writable memmap for a specific array in the container.
Examples
--------
"""
if name not in self.arrays_info:
raise KeyError(f"Array '{name}' not found. Call write() first to finalize layout.")
info = self.arrays_info[name]
return np.memmap(
self.fpath,
dtype=np.dtype(info["dtype"]),
mode='r+',
offset=info["offset"],
shape=tuple(info["shape"])
)
[docs]
class GnzReader:
"""
Class GnzReader.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
def __init__(self, fpath: t.Union[str, pathlib.Path]):
"""
Function __init__.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
self.fpath = pathlib.Path(fpath)
if not self.fpath.exists():
raise FileNotFoundError(f"File not found: {self.fpath}")
self._parse_header()
def _parse_header(self):
"""
Function _parse_header.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
with open(self.fpath, "rb") as f:
magic = f.read(4)
if magic != GNZ_MAGIC:
raise ValueError("Invalid GNZ file format")
len_bytes = f.read(4)
header_len = struct.unpack("<I", len_bytes)[0]
json_bytes = f.read(header_len)
self.header = json.loads(json_bytes.decode('utf-8'))
self.metadata = self.header.get("metadata", {})
self.arrays_info = self.header.get("arrays", {})
[docs]
def get_array(self, name: str, mode: str = "r") -> np.memmap:
"""
Function get_array.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
if name not in self.arrays_info:
raise KeyError(f"Array '{name}' not found in GNZ file")
info = self.arrays_info[name]
offset = info["offset"]
shape = tuple(info["shape"])
dtype = np.dtype(info["dtype"])
return np.memmap(
self.fpath,
dtype=dtype,
mode=mode,
offset=offset,
shape=shape,
order=info.get("order", "C")
)
[docs]
def keys(self) -> t.List[str]:
"""
Function keys.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
return list(self.arrays_info.keys())