# -*- coding: utf-8 -*-
"""
Module for converting various contact matrix formats into a memory-mapped
(memmap) file for efficient, on-disk matrix operations.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "2.0.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import functools
import json
import pathlib
import typing as t
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import numpy.typing as npt
import pandas as pd
from pydantic import ConfigDict, validate_call
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from .. import loaders
from ..exceptions import ConverterError
from ..consts import Balancing, DataStructure, DataFrameSpecs
# =============================================================================
# CORE CONVERSION LOGIC (SINGLE DISPATCH)
# =============================================================================
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
@functools.singledispatch
def convert_to_memmap(
data: t.Union[pathlib.Path, pd.DataFrame, t.Tuple[npt.ArrayLike, ...]],
output_fpath: pathlib.Path,
# Additional arguments are passed via kwargs to the registered functions
**kwargs,
) -> None:
"""
Converts contact matrix data to a NumPy memory-mapped file (memmap).
This is a polymorphic function that dispatches to the appropriate
implementation based on the type of the `data` argument.
Parameters
----------
data : pathlib.Path or pd.DataFrame or tuple
The input data to convert. Can be:
- A path to a standard contact matrix file (.hic, .cool, etc.).
- A pandas DataFrame in COO format.
- A tuple of (rows, cols, values) arrays.
output_fpath : pathlib.Path
The base path for the output memmap file.
**kwargs :
Additional arguments specific to the conversion type, such as
`resolution`, `balancing`, `output_full_matrix`, etc.
Examples
--------
"""
raise NotImplementedError(f"No conversion implementation for type: {type(data)}")
@convert_to_memmap.register(pd.DataFrame)
def _convert_df_to_memmap(
data: pd.DataFrame,
output_fpath: pathlib.Path,
**kwargs,
) -> None:
"""Dispatched implementation for converting a DataFrame to memmap.
Examples
--------
"""
row_ids = data[DataFrameSpecs.ROW_IDS].to_numpy()
col_ids = data[DataFrameSpecs.COL_IDS].to_numpy()
counts = data[DataFrameSpecs.COUNTS].to_numpy()
# Delegate to the array-based implementation
_convert_coo_to_memmap((row_ids, col_ids, counts), output_fpath, **kwargs)
@convert_to_memmap.register(tuple)
def _convert_coo_to_memmap(
data: t.Tuple[npt.ArrayLike, npt.ArrayLike, npt.ArrayLike],
output_fpath: pathlib.Path,
output_full_matrix: bool = True,
dtype: t.Optional[npt.DTypeLike] = None,
shape: t.Optional[t.Tuple[int, int]] = None,
check_output: bool = True,
overwrite: bool = False,
metadata: t.Optional[t.Dict[str, t.Any]] = None,
**kwargs, # Consume any extra arguments
) -> None:
"""Dispatched implementation for converting COO arrays to memmap.
Examples
--------
"""
if loaders.is_memmap_exists(output_fpath) and not overwrite:
raise FileExistsError(f"Memmap file already exists: {output_fpath}")
bin_fpath, meta_fpath = loaders.gen_memmap_fpaths(output_fpath)
bin_fpath.parent.mkdir(parents=True, exist_ok=True)
row_ids, col_ids, counts = np.asarray(data[0]), np.asarray(data[1]), np.asarray(data[2])
# Replace NaNs/Infs with 0 to ensure they are treated as "empty" in sparse usage
# and to allow safe quantization to integer types if needed
if np.issubdtype(counts.dtype, np.floating):
mask_invalid = ~np.isfinite(counts)
if np.any(mask_invalid):
counts[mask_invalid] = 0
final_dtype = dtype or counts.dtype
# Overflow Protection for integer types
if np.issubdtype(final_dtype, np.integer):
info = np.iinfo(final_dtype)
if np.any(counts > info.max) or np.any(counts < info.min):
import warnings
warnings.warn(
f"Data contains values outside the range of {final_dtype} "
f"([{info.min}, {info.max}]). Values will be clipped. "
"Consider using a larger dtype (e.g., int32) to avoid data loss.",
RuntimeWarning
)
counts = np.clip(counts, info.min, info.max)
if shape is None:
n = max(np.max(row_ids), np.max(col_ids)) + 1
final_shape = (int(n), int(n))
else:
final_shape = shape
fp = np.memmap(bin_fpath, dtype=final_dtype, mode='w+', shape=final_shape)
# Populate the memmap file
if output_full_matrix:
# Ensure symmetry for full matrix output
nondiag_mask = row_ids != col_ids
fp[row_ids, col_ids] = counts
fp[col_ids[nondiag_mask], row_ids[nondiag_mask]] = counts[nondiag_mask]
else:
fp[row_ids, col_ids] = counts
fp.flush() # Ensure data is written to disk
# Write metadata file
meta_dict = {"shape": final_shape, "dtype": np.dtype(final_dtype).str}
if metadata:
meta_dict.update(metadata)
with open(meta_fpath, 'w') as f:
json.dump(meta_dict, f, indent=4, sort_keys=True)
if check_output:
recon_fp = loaders.load_memmap(output_fpath)
# Cast original counts to the stored dtype to emulate quantization
expected_counts = counts.astype(final_dtype)
# Use allclose with equal_nan=True for floating point robustness
if not np.allclose(recon_fp.data[row_ids, col_ids], expected_counts, equal_nan=True):
raise ConverterError(
"Memmap data integrity check failed: wrote vs read mismatch."
)
if output_full_matrix:
if not np.allclose(recon_fp.data[col_ids, row_ids], expected_counts, equal_nan=True):
raise ConverterError(
"Memmap data integrity check failed: full matrix mismatch."
)
@convert_to_memmap.register(pathlib.Path)
def _convert_file_to_memmap(
data: pathlib.Path,
output_fpath: pathlib.Path,
region1: str,
resolution: int,
balancing: t.Optional[Balancing],
**kwargs,
) -> None:
"""Dispatched implementation for converting a source file to memmap.
Examples
--------
"""
if not data.exists():
raise FileNotFoundError(f"Input file not found: {data}")
if not loaders.is_file_standard_cm(str(data)):
raise ConverterError("Input file must be a standard contact matrix format.")
# Extract memmap-specific arguments to prevent passing them to load_cm_data
memmap_keys = ["dtype", "output_full_matrix", "shape", "check_output", "overwrite"]
memmap_kwargs = {}
for key in memmap_keys:
if key in kwargs:
memmap_kwargs[key] = kwargs.pop(key)
cm_df = loaders.load_cm_data(
fpath=data,
region1=region1,
resolution=resolution,
balancing=balancing,
output_format=DataStructure.DF,
**kwargs
)
# Pass metadata
meta = {
"resolution": resolution,
"chromosome1": region1,
"chromosome2": region1, # Default intra
"balancing": balancing.value if balancing else "NONE"
}
# Delegate to the DataFrame-based implementation
_convert_df_to_memmap(cm_df, output_fpath, metadata=meta, **memmap_kwargs)