# -*- coding: utf-8 -*-
"""
This module provides a unified interface to parse various contact matrix
file formats and load them into memory.
It acts as a facade, dispatching calls to the appropriate format-specific
loader (e.g., for .hic, .cool, .csv) while providing a consistent API to the user.
Functions:
load_cm_data: Load a contact matrix from a file into memory.
get_chrom_infos: Query chromosome names and lengths from a file.
get_resolutions: List the available resolutions in a file.
get_balancing: List available balancing methods for a specific region.
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__copyright__ = "Institut für Informationsverarbeitung"
__license__ = "Clear BSD"
__version__ = "2.5.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib
import typing as t
import warnings
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from pydantic import validate_call
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from ..consts import Balancing, BpFrag, Counts, DataStructure, Format, GenomeBuild, DataFrameSpecs, Backend
from ..exceptions import LoaderError, FormatError, DataResolutionError
from ..matrix import ContactMatrix
# Import all available loader modules from the directory
from . import (
cool_loader,
csv_loader,
ginteractions_loader,
hic_loader,
memmap_loader,
pickle_loader,
)
from .utils import Region
from .memmap_loader import load_memmap, gen_memmap_fpaths, is_memmap_exists
from .third_party.straw import check_straw_available
# =============================================================================
# PUBLIC API
# =============================================================================
# Defines the public-facing API of the loaders package
__all__ = [
"load_cm_data",
"get_chrom_infos",
"get_resolutions",
"get_balancing",
"get_bins",
"load_memmap",
"gen_memmap_fpaths",
"is_memmap_exists",
"is_file_standard_cm",
"Region",
# Re-exporting key enums and constants for user convenience
"Format",
"DataStructure",
"Balancing",
"Counts",
"GenomeBuild",
"BpFrag",
"DataFrameSpecs",
]
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _get_format(
fpath: pathlib.Path,
fformat: Format | None = None,
) -> Format:
"""
Determines the file format from its extension or an explicit argument.
"""
if fformat:
return Format(fformat) # Leverage enum's own validation and aliasing
# Get the final suffix, handling compound extensions like .csv.gz
extension = fpath.name.split(".gz")[0].split(".")[-1]
# Check aliases first
if extension in Format.__ALIASES__:
return Format(Format.__ALIASES__[extension])
try:
return Format.from_fuzzy_string(extension)
except (ValueError, KeyError) as e:
raise FormatError(
f"Unsupported file format '{extension}'. Please specify a valid "
"format using the 'fformat' argument."
) from e
[docs]
def is_file_standard_cm(fpath: str) -> bool:
"""Checks if the file is a standard contact matrix file format."""
try:
_get_format(pathlib.Path(fpath))
return True
except (ValueError, FormatError):
return False
# =============================================================================
# DISPATCH DICTIONARIES (Architectural Pattern)
# =============================================================================
# This pattern replaces complex if/elif chains, making the code more modular.
# --- Dispatchers for metadata functions ---
_RESOLUTION_GETTERS: t.Mapping[Format, t.Callable[[str], t.List[int]]] = {
Format.HIC: hic_loader.get_resolutions,
Format.COOLER: cool_loader.get_resolutions,
# Formats without intrinsic multi-resolution support return a default
Format.CSV: lambda _: [1],
Format.TSV: lambda _: [1],
Format.MCSV: lambda _: [1],
Format.MEMMAP: lambda _: [1],
Format.GINTERACTIONS: lambda _: [1],
Format.PICKLE: lambda _: [1],
}
_CHROM_INFO_GETTERS: t.Mapping[Format, t.Callable[[str], t.Dict[str, int]]] = {
Format.HIC: hic_loader.get_chrom_infos,
Format.COOLER: cool_loader.get_chrom_infos,
}
_BIN_GETTERS: t.Mapping[Format, t.Callable[[str, int], pd.DataFrame]] = {
Format.HIC: hic_loader.get_bins,
Format.COOLER: cool_loader.get_bins,
}
# --- Dispatcher for data loading functions ---
# Each lambda defines how to call its specific loader, decoupling the main
# `load_cm_data` function from the details of each loader's signature.
_DATA_LOADERS: t.Mapping[Format, t.Callable[..., t.Any]] = {
Format.HIC: lambda **args: hic_loader.load_hic(**args),
Format.COOLER: lambda **args: cool_loader.load_cooler(**args),
Format.CSV: lambda **args: csv_loader.load_csv(delimiter=',', **args),
Format.TSV: lambda **args: csv_loader.load_csv(delimiter='\t', **args),
Format.COO: lambda **args: csv_loader.load_csv(**args),
Format.MCOO: lambda **args: csv_loader.load_csv(**args),
Format.GINTERACTIONS: lambda **args: ginteractions_loader.load_ginteractions(**args),
Format.PICKLE: lambda **args: pickle_loader.load_pickle(**args),
# Loaders with simple signatures
Format.NPY: lambda fpath, **_: np.load(fpath, allow_pickle=True),
Format.MEMMAP: lambda fpath, **_: memmap_loader.load_memmap(fpath),
}
# =============================================================================
# PUBLIC FUNCTIONS
# =============================================================================
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_resolutions(
#? --- Input Files ---
fpath: str,
) -> list[int]:
"""
Gets the resolutions available in a contact matrix file.
Parameters
----------
fpath : str
The path to the contact matrix file.
Returns
-------
list[int]
A list of available resolutions.
"""
fformat = _get_format(pathlib.Path(fpath))
getter = _RESOLUTION_GETTERS.get(fformat)
if getter:
return getter(fpath)
raise FormatError(f"Cannot get resolutions for format: {fformat.value}")
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_chrom_infos(
#? --- Input Files ---
fpath: str,
) -> dict[str, int]:
"""
Queries chromosome names and lengths from a .hic or .cool file.
Parameters
----------
fpath : str
The path to the contact matrix file.
Returns
-------
dict[str, int]
A mapping of chromosome names to their lengths.
"""
fformat = _get_format(pathlib.Path(fpath))
getter = _CHROM_INFO_GETTERS.get(fformat)
if getter:
return getter(fpath)
raise FormatError(
"Chromosome info is only available for .hic or .cool formats, "
f"not '{fformat.value}'."
)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_bins(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Options ---
resolution: int,
) -> pd.DataFrame:
"""
Gets the binnified index from a .hic or .cool file.
Parameters
----------
fpath : str | pathlib.Path
The path to the contact matrix file.
resolution : int
The resolution to use for binnification.
Returns
-------
pd.DataFrame
A DataFrame with columns: 'chrom', 'start', 'end'.
"""
fpath = str(fpath) # Ensure string for internal dispatch
fformat = _get_format(pathlib.Path(fpath))
getter = _BIN_GETTERS.get(fformat)
if getter:
return getter(fpath, resolution)
raise FormatError(
"Bin info is only available for .hic or .cool formats, "
f"not '{fformat.value}'."
)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_balancing(
#? --- Input Files ---
fpath: str,
#? --- Query Info ---
resolution: int,
chrom: str,
) -> list[str]:
"""
Gets available balancing methods for a region in a .hic or .cool file.
Parameters
----------
fpath : str
The path to the contact matrix file.
resolution : int
The resolution of the contact matrix.
chrom : str
The chromosome of interest (e.g., 'chr1').
Returns
-------
list[str]
A list of available balancing methods (e.g., ['KR', 'VC_SQRT']).
"""
fformat = _get_format(pathlib.Path(fpath))
if fformat == Format.HIC:
return hic_loader.get_balancing(fpath, resolution, chrom)
if fformat == Format.COOLER:
return cool_loader.get_balancing(fpath, resolution, chrom)
raise FormatError(
"Balancing info is only available for .hic or .cool formats, "
f"not '{fformat.value}'."
)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def load_cm_data(
#? --- Input Files ---
fpath: pathlib.Path,
resolution: int,
#? --- Regions ---
region1: str | None = None,
region2: str | None = None,
#? --- Options ---
balancing: Balancing | list[Balancing] | None = None,
output_format: DataStructure = DataStructure.DF,
fformat: Format | None = None,
backend: Backend | None = None,
use_fast_hic: bool = False,
return_raw_counts: bool = False,
#? --- Extras ---
**kwargs,
) -> pd.DataFrame | tuple[np.ndarray, ...] | np.ndarray | tuple[t.Any, ...]:
"""
Loads contact matrix data from various file formats.
This function acts as a dispatcher, routing the call to the appropriate
format-specific loader based on the file's extension or the `fformat`
argument.
Parameters
----------
fpath : pathlib.Path
Path to the contact matrix file.
resolution : int
Resolution of the contact matrix to load.
region1 : str, optional
First genomic region (e.g., 'chr1'). Defaults to None.
region2 : str, optional
Second genomic region. If None, loads intra-chromosomal data for
`region1`. Defaults to None.
balancing : Balancing | list[Balancing], optional
Balancing (normalization) method(s) to apply. Defaults to None.
out_datastructure : DataStructure, optional
Desired output format ('df' or 'coo'). Defaults to DataStructure.DF.
fformat : Format, optional
Explicitly specify file format, otherwise inferred from extension.
Defaults to None.
backend : Backend, optional
Select the underlying backend library for loading.
For COOLER: 'cooler', 'hictk'.
For HIC: 'hicstraw', 'hictk', 'straw'.
Defaults to None (uses standard backend for format).
use_fast_hic : bool, optional
If True and file is .hic, use the faster `fast_hic_loader`.
Equivalent to setting backend='straw'.
Defaults to False.
return_raw_counts : bool, optional
If True, return raw counts alongside the primary (balanced) counts.
Defaults to False.
**kwargs :
Additional keyword arguments passed to the specific loader,
(e.g., `encoding` for CSV files).
Returns
-------
pd.DataFrame | tuple[np.ndarray, ...] | np.ndarray | tuple[t.Any, ...]
The loaded contact matrix data in the specified output format.
Raises
------
FormatError
If the file format is not recognized or supported, or if an invalid
backend is selected for the format.
NotImplementedError
If `return_raw_counts` is True for unsupported formats.
"""
resolved_fformat = _get_format(fpath, fformat)
# --- Backend Resolution & Validation ---
# Map legacy use_fast_hic to STRAW backend
if backend is None and use_fast_hic:
backend = Backend.STRAW
# Handle default for HIC format: Prefer STRAW (fast) if available
if resolved_fformat == Format.HIC and backend is None:
if check_straw_available():
backend = Backend.STRAW
else:
warnings.warn(
"straw CLI tool not found in PATH. Falling back to HICSTRAW (slower). "
"Install straw CLI for faster loading.",
RuntimeWarning
)
backend = Backend.HICSTRAW
if backend is not None:
if resolved_fformat == Format.COOLER:
if backend not in (Backend.COOLER, Backend.HICTK):
raise FormatError(
f"Backend '{backend}' is invalid for format '{resolved_fformat}'. "
f"Use one of: {Backend.COOLER}, {Backend.HICTK}"
)
elif resolved_fformat == Format.HIC:
if backend not in (Backend.HICSTRAW, Backend.HICTK, Backend.STRAW):
raise FormatError(
f"Backend '{backend}' is invalid for format '{resolved_fformat}'. "
f"Use one of: {Backend.HICSTRAW}, {Backend.HICTK}, {Backend.STRAW}"
)
# If explicitly requested STRAW but missing, raise error
if backend == Backend.STRAW and not check_straw_available():
raise FileNotFoundError(
"straw CLI tool not found in PATH. Please install it to use Backend.STRAW."
)
# --- Dispatcher Selection ---
loader_func = _DATA_LOADERS.get(resolved_fformat)
if not loader_func:
raise LoaderError(f"No data loader available for format: '{resolved_fformat.value}'")
# --- Argument Consolidation ---
all_args = {
"fpath": str(fpath),
"resolution": resolution,
"region1": region1,
"region2": region2,
"balancing": balancing,
"output_format": output_format,
**kwargs,
}
# Pass backend down if supported by the loader
if backend is not None:
all_args["backend"] = backend
# Handle return_raw_counts by mapping to list[Balancing]
if return_raw_counts:
if resolved_fformat == Format.COOLER:
# Ensure balancing is a list and contains Balancing.NONE
if balancing is None:
balancing = [Balancing.NONE]
elif isinstance(balancing, list):
if Balancing.NONE not in balancing:
balancing.append(Balancing.NONE)
else: # Single balancing provided
if balancing != Balancing.NONE:
balancing = [balancing, Balancing.NONE]
else:
balancing = [Balancing.NONE]
# Update the argument to be passed
all_args["balancing"] = balancing
# We do NOT pass return_raw_counts=True down anymore for these formats,
# as the list logic handles it.
elif resolved_fformat == Format.HIC:
all_args["return_raw_counts"] = True
else:
raise NotImplementedError(
f"return_raw_counts=True is not yet supported for format: '{resolved_fformat.value}'"
)
# Execute the selected loader
result = loader_func(**all_args)
if isinstance(result, ContactMatrix):
return result.data
return result