Source code for gunz_cm.loaders.csv_loader

from __future__ import annotations
# -*- coding: utf-8 -*-
"""
Provides a robust loader for contact matrix data from delimited text files.

This module is designed to parse tabular data (like CSV or TSV) representing
Hi-C contacts in a sparse (COO) format, converting it into usable in-memory
data structures.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.2.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import io
import typing as t

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import pandas as pd
from pydantic import validate_call

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from ..consts import Balancing, DataStructure, DataFrameSpecs
from ..exceptions import (
    InvalidRegionFormatError,
    UnsupportedLoaderFeatureError,
    FormatError,
)
from ..matrix import ContactMatrix
from ..preprocs.converters import to_coo_matrix


import pathlib

@validate_call(config={"arbitrary_types_allowed": True})
def _load_csv_data(
    fpath: str | pathlib.Path | io.BytesIO,
    region1: str,
    resolution: int,
    region2: str | None = None,
    balancing: Balancing | None = None,
    delimiter: str = r"\s+",
    encoding: str = "utf-8",
    output_format: DataStructure = DataStructure.DF,
    column_names: t.List[str] | None = None,
) -> t.Any:
    """
    Internal function to load CSV data.

    Parameters
    ----------
    fpath : str | pathlib.Path | io.BytesIO
        The file path or an in-memory byte stream to read from.
    region1 : str
        The chromosome to load.
    resolution : int
        The resolution for binning.
    region2 : str | None, optional
        The second region.
    balancing : Balancing | None, optional
        The balancing method.
    delimiter : str, optional
        The delimiter to use.
    encoding : str, optional
        The character encoding.
    output_format : DataStructure, optional
        The desired output format.
    column_names : List[str], optional
        Explicit column names. If None, inferred or uses defaults.

    Returns
    -------
    t.Any
        The loaded data.


Examples
--------
"""
    if ":" in region1:
        raise InvalidRegionFormatError(
            region=region1,
            message="Unsupported region format. Please provide only the chromosome name (e.g., 'chr1')."
        )
    if region2 is not None:
        raise UnsupportedLoaderFeatureError("Inter-chromosomal loading", "CSV loader")

    if isinstance(fpath, pathlib.Path):
        fpath = str(fpath)

    if column_names is None:
        # Peak at the file to see how many columns it has
        try:
            sample_df = pd.read_csv(fpath, sep=delimiter, nrows=1, header=None, encoding=encoding, engine="python")
            num_cols = len(sample_df.columns)
            if num_cols == 4:
                column_names = DataFrameSpecs.MCOO_COLUMN_NAMES
            else:
                column_names = DataFrameSpecs.COO_COLUMN_NAMES
        except Exception:
            column_names = DataFrameSpecs.COO_COLUMN_NAMES

    if balancing is None or balancing == Balancing.NONE:
        # For MCOO, we use MCOO_DTYPES if names match
        if column_names == DataFrameSpecs.MCOO_COLUMN_NAMES:
            dtypes = DataFrameSpecs.MCOO_DTYPES
        else:
            dtypes = DataFrameSpecs.RAW_COO_DTYPES
    else:
        dtypes = DataFrameSpecs.NORM_COO_DTYPES

    df = pd.read_csv(
        fpath,
        names=column_names,
        dtype=dtypes,
        sep=delimiter,
        encoding=encoding,
        engine="python",
    )

    df[DataFrameSpecs.ROW_IDS] //= resolution
    df[DataFrameSpecs.COL_IDS] //= resolution

    # Preserve order of data columns
    data_columns = [col for col in df.columns if col not in {DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS}]
    df = df.groupby([DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS], as_index=False)[
        data_columns
    ].sum()

    if output_format == DataStructure.DF:
        return df
    if output_format in (DataStructure.RCV, DataStructure.COO):
        coo_matrix = to_coo_matrix(df, is_triu_sym=False)
        return (coo_matrix.row, coo_matrix.col, coo_matrix.data)

    raise FormatError(f"Invalid output format: {output_format}!")

[docs] @validate_call(config={"arbitrary_types_allowed": True}) def load_csv( fpath: str | pathlib.Path | io.BytesIO, region1: str, resolution: int, region2: str | None = None, balancing: Balancing | None = None, delimiter: str = r"\s+", encoding: str = "utf-8", output_format: DataStructure = DataStructure.DF, column_names: t.List[str] | None = None, ) -> ContactMatrix: """Loads contact data from a CSV-like file path or buffer lazily. Parameters ---------- fpath : str | pathlib.Path | io.BytesIO The file path or an in-memory byte stream to read from. region1 : str The chromosome to load (e.g., "chr1"). resolution : int The resolution (bin size) to apply to the coordinate columns. region2 : str | None, optional The second region for inter-chromosomal data. Currently not supported. balancing : Balancing | None, optional The balancing method reflected in the data. delimiter : str, optional The delimiter to use for parsing. encoding : str, optional The character encoding of the file. output_format : DataStructure, optional The desired output format. column_names : List[str], optional Explicit column names. Examples -------- Examples -------- """ if output_format == DataStructure.DF: loader_kwargs = { "fpath": fpath, "region1": region1, "resolution": resolution, "region2": region2, "balancing": balancing, "delimiter": delimiter, "encoding": encoding, "output_format": output_format, "column_names": column_names, } return ContactMatrix( chromosome1=region1, chromosome2=region2, resolution=resolution, loader_func=_load_csv_data, loader_kwargs=loader_kwargs, metadata={"format": "csv"} ) return _load_csv_data( fpath=fpath, region1=region1, resolution=resolution, region2=region2, balancing=balancing, delimiter=delimiter, encoding=encoding, output_format=output_format, )