Source code for gunz_cm.loaders

# -*- coding: utf-8 -*-
"""
This module provides a unified interface to parse various contact matrix
file formats and load them into memory.

It acts as a facade, dispatching calls to the appropriate format-specific
loader (e.g., for .hic, .cool, .csv) while providing a consistent API to the user.

Functions:
    load_cm_data: Load a contact matrix from a file into memory.
    get_chrom_infos: Query chromosome names and lengths from a file.
    get_resolutions: List the available resolutions in a file.
    get_balancing: List available balancing methods for a specific region.
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__copyright__ = "Institut für Informationsverarbeitung"
__license__ = "Clear BSD"
__version__ = "2.5.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib
import typing as t
import warnings

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from pydantic import validate_call

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from ..consts import Balancing, BpFrag, Counts, DataStructure, Format, GenomeBuild, DataFrameSpecs, Backend
from ..exceptions import LoaderError, FormatError, DataResolutionError
from ..matrix import ContactMatrix
# Import all available loader modules from the directory
from . import (
    cool_loader,
    csv_loader,
    ginteractions_loader,
    hic_loader,
    memmap_loader,
    pickle_loader,
)
from .utils import Region

from .memmap_loader import load_memmap, gen_memmap_fpaths, is_memmap_exists
from .third_party.straw import check_straw_available

# =============================================================================
# PUBLIC API
# =============================================================================
# Defines the public-facing API of the loaders package
__all__ = [
    "load_cm_data",
    "get_chrom_infos",
    "get_resolutions",
    "get_balancing",
    "get_bins",
    "load_memmap",
    "gen_memmap_fpaths",
    "is_memmap_exists",
    "is_file_standard_cm",
    "Region",
    # Re-exporting key enums and constants for user convenience
    "Format",
    "DataStructure",
    "Balancing",
    "Counts",
    "GenomeBuild",
    "BpFrag",
    "DataFrameSpecs",
]


# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def _get_format(
    fpath: pathlib.Path,
    fformat: Format | None = None,
) -> Format:
    """
    Determines the file format from its extension or an explicit argument.
    """
    if fformat:
        return Format(fformat) # Leverage enum's own validation and aliasing

    # Get the final suffix, handling compound extensions like .csv.gz
    extension = fpath.name.split(".gz")[0].split(".")[-1]
    
    # Check aliases first
    if extension in Format.__ALIASES__:
        return Format(Format.__ALIASES__[extension])

    try:
        return Format.from_fuzzy_string(extension)
    except (ValueError, KeyError) as e:
        raise FormatError(
            f"Unsupported file format '{extension}'. Please specify a valid "
            "format using the 'fformat' argument."
        ) from e

[docs] def is_file_standard_cm(fpath: str) -> bool: """Checks if the file is a standard contact matrix file format.""" try: _get_format(pathlib.Path(fpath)) return True except (ValueError, FormatError): return False
# ============================================================================= # DISPATCH DICTIONARIES (Architectural Pattern) # ============================================================================= # This pattern replaces complex if/elif chains, making the code more modular. # --- Dispatchers for metadata functions --- _RESOLUTION_GETTERS: t.Mapping[Format, t.Callable[[str], t.List[int]]] = { Format.HIC: hic_loader.get_resolutions, Format.COOLER: cool_loader.get_resolutions, # Formats without intrinsic multi-resolution support return a default Format.CSV: lambda _: [1], Format.TSV: lambda _: [1], Format.MCSV: lambda _: [1], Format.MEMMAP: lambda _: [1], Format.GINTERACTIONS: lambda _: [1], Format.PICKLE: lambda _: [1], } _CHROM_INFO_GETTERS: t.Mapping[Format, t.Callable[[str], t.Dict[str, int]]] = { Format.HIC: hic_loader.get_chrom_infos, Format.COOLER: cool_loader.get_chrom_infos, } _BIN_GETTERS: t.Mapping[Format, t.Callable[[str, int], pd.DataFrame]] = { Format.HIC: hic_loader.get_bins, Format.COOLER: cool_loader.get_bins, } # --- Dispatcher for data loading functions --- # Each lambda defines how to call its specific loader, decoupling the main # `load_cm_data` function from the details of each loader's signature. _DATA_LOADERS: t.Mapping[Format, t.Callable[..., t.Any]] = { Format.HIC: lambda **args: hic_loader.load_hic(**args), Format.COOLER: lambda **args: cool_loader.load_cooler(**args), Format.CSV: lambda **args: csv_loader.load_csv(delimiter=',', **args), Format.TSV: lambda **args: csv_loader.load_csv(delimiter='\t', **args), Format.COO: lambda **args: csv_loader.load_csv(**args), Format.MCOO: lambda **args: csv_loader.load_csv(**args), Format.GINTERACTIONS: lambda **args: ginteractions_loader.load_ginteractions(**args), Format.PICKLE: lambda **args: pickle_loader.load_pickle(**args), # Loaders with simple signatures Format.NPY: lambda fpath, **_: np.load(fpath, allow_pickle=True), Format.MEMMAP: lambda fpath, **_: memmap_loader.load_memmap(fpath), } # ============================================================================= # PUBLIC FUNCTIONS # =============================================================================
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_resolutions( #? --- Input Files --- fpath: str, ) -> list[int]: """ Gets the resolutions available in a contact matrix file. Parameters ---------- fpath : str The path to the contact matrix file. Returns ------- list[int] A list of available resolutions. """ fformat = _get_format(pathlib.Path(fpath)) getter = _RESOLUTION_GETTERS.get(fformat) if getter: return getter(fpath) raise FormatError(f"Cannot get resolutions for format: {fformat.value}")
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_chrom_infos( #? --- Input Files --- fpath: str, ) -> dict[str, int]: """ Queries chromosome names and lengths from a .hic or .cool file. Parameters ---------- fpath : str The path to the contact matrix file. Returns ------- dict[str, int] A mapping of chromosome names to their lengths. """ fformat = _get_format(pathlib.Path(fpath)) getter = _CHROM_INFO_GETTERS.get(fformat) if getter: return getter(fpath) raise FormatError( "Chromosome info is only available for .hic or .cool formats, " f"not '{fformat.value}'." )
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_bins( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Options --- resolution: int, ) -> pd.DataFrame: """ Gets the binnified index from a .hic or .cool file. Parameters ---------- fpath : str | pathlib.Path The path to the contact matrix file. resolution : int The resolution to use for binnification. Returns ------- pd.DataFrame A DataFrame with columns: 'chrom', 'start', 'end'. """ fpath = str(fpath) # Ensure string for internal dispatch fformat = _get_format(pathlib.Path(fpath)) getter = _BIN_GETTERS.get(fformat) if getter: return getter(fpath, resolution) raise FormatError( "Bin info is only available for .hic or .cool formats, " f"not '{fformat.value}'." )
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_balancing( #? --- Input Files --- fpath: str, #? --- Query Info --- resolution: int, chrom: str, ) -> list[str]: """ Gets available balancing methods for a region in a .hic or .cool file. Parameters ---------- fpath : str The path to the contact matrix file. resolution : int The resolution of the contact matrix. chrom : str The chromosome of interest (e.g., 'chr1'). Returns ------- list[str] A list of available balancing methods (e.g., ['KR', 'VC_SQRT']). """ fformat = _get_format(pathlib.Path(fpath)) if fformat == Format.HIC: return hic_loader.get_balancing(fpath, resolution, chrom) if fformat == Format.COOLER: return cool_loader.get_balancing(fpath, resolution, chrom) raise FormatError( "Balancing info is only available for .hic or .cool formats, " f"not '{fformat.value}'." )
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def load_cm_data( #? --- Input Files --- fpath: pathlib.Path, resolution: int, #? --- Regions --- region1: str | None = None, region2: str | None = None, #? --- Options --- balancing: Balancing | list[Balancing] | None = None, output_format: DataStructure = DataStructure.DF, fformat: Format | None = None, backend: Backend | None = None, use_fast_hic: bool = False, return_raw_counts: bool = False, #? --- Extras --- **kwargs, ) -> pd.DataFrame | tuple[np.ndarray, ...] | np.ndarray | tuple[t.Any, ...]: """ Loads contact matrix data from various file formats. This function acts as a dispatcher, routing the call to the appropriate format-specific loader based on the file's extension or the `fformat` argument. Parameters ---------- fpath : pathlib.Path Path to the contact matrix file. resolution : int Resolution of the contact matrix to load. region1 : str, optional First genomic region (e.g., 'chr1'). Defaults to None. region2 : str, optional Second genomic region. If None, loads intra-chromosomal data for `region1`. Defaults to None. balancing : Balancing | list[Balancing], optional Balancing (normalization) method(s) to apply. Defaults to None. out_datastructure : DataStructure, optional Desired output format ('df' or 'coo'). Defaults to DataStructure.DF. fformat : Format, optional Explicitly specify file format, otherwise inferred from extension. Defaults to None. backend : Backend, optional Select the underlying backend library for loading. For COOLER: 'cooler', 'hictk'. For HIC: 'hicstraw', 'hictk', 'straw'. Defaults to None (uses standard backend for format). use_fast_hic : bool, optional If True and file is .hic, use the faster `fast_hic_loader`. Equivalent to setting backend='straw'. Defaults to False. return_raw_counts : bool, optional If True, return raw counts alongside the primary (balanced) counts. Defaults to False. **kwargs : Additional keyword arguments passed to the specific loader, (e.g., `encoding` for CSV files). Returns ------- pd.DataFrame | tuple[np.ndarray, ...] | np.ndarray | tuple[t.Any, ...] The loaded contact matrix data in the specified output format. Raises ------ FormatError If the file format is not recognized or supported, or if an invalid backend is selected for the format. NotImplementedError If `return_raw_counts` is True for unsupported formats. """ resolved_fformat = _get_format(fpath, fformat) # --- Backend Resolution & Validation --- # Map legacy use_fast_hic to STRAW backend if backend is None and use_fast_hic: backend = Backend.STRAW # Handle default for HIC format: Prefer STRAW (fast) if available if resolved_fformat == Format.HIC and backend is None: if check_straw_available(): backend = Backend.STRAW else: warnings.warn( "straw CLI tool not found in PATH. Falling back to HICSTRAW (slower). " "Install straw CLI for faster loading.", RuntimeWarning ) backend = Backend.HICSTRAW if backend is not None: if resolved_fformat == Format.COOLER: if backend not in (Backend.COOLER, Backend.HICTK): raise FormatError( f"Backend '{backend}' is invalid for format '{resolved_fformat}'. " f"Use one of: {Backend.COOLER}, {Backend.HICTK}" ) elif resolved_fformat == Format.HIC: if backend not in (Backend.HICSTRAW, Backend.HICTK, Backend.STRAW): raise FormatError( f"Backend '{backend}' is invalid for format '{resolved_fformat}'. " f"Use one of: {Backend.HICSTRAW}, {Backend.HICTK}, {Backend.STRAW}" ) # If explicitly requested STRAW but missing, raise error if backend == Backend.STRAW and not check_straw_available(): raise FileNotFoundError( "straw CLI tool not found in PATH. Please install it to use Backend.STRAW." ) # --- Dispatcher Selection --- loader_func = _DATA_LOADERS.get(resolved_fformat) if not loader_func: raise LoaderError(f"No data loader available for format: '{resolved_fformat.value}'") # --- Argument Consolidation --- all_args = { "fpath": str(fpath), "resolution": resolution, "region1": region1, "region2": region2, "balancing": balancing, "output_format": output_format, **kwargs, } # Pass backend down if supported by the loader if backend is not None: all_args["backend"] = backend # Handle return_raw_counts by mapping to list[Balancing] if return_raw_counts: if resolved_fformat == Format.COOLER: # Ensure balancing is a list and contains Balancing.NONE if balancing is None: balancing = [Balancing.NONE] elif isinstance(balancing, list): if Balancing.NONE not in balancing: balancing.append(Balancing.NONE) else: # Single balancing provided if balancing != Balancing.NONE: balancing = [balancing, Balancing.NONE] else: balancing = [Balancing.NONE] # Update the argument to be passed all_args["balancing"] = balancing # We do NOT pass return_raw_counts=True down anymore for these formats, # as the list logic handles it. elif resolved_fformat == Format.HIC: all_args["return_raw_counts"] = True else: raise NotImplementedError( f"return_raw_counts=True is not yet supported for format: '{resolved_fformat.value}'" ) # Execute the selected loader result = loader_func(**all_args) if isinstance(result, ContactMatrix): return result.data return result