Source code for gunz_cm.loaders.cool_loader

from __future__ import annotations
# -*- coding: utf-8 -*-
"""
A module for reading and processing .cool and .mcool format files.

This module provides functions to extract metadata, chromosome information,
and contact matrix data from Cooler files, leveraging the `cooler` library.
It is designed to be consistent with the other parsers in this library.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib
import typing as t

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import cooler
import h5py
import numpy as np
import pandas as pd
from pydantic import ConfigDict, validate_call
from scipy import sparse as sp

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from ..exceptions import LoaderError
from ..consts import (
    DS,
    Balancing,
    DataFrameSpecs,
    Backend,
)
from ..matrix import ContactMatrix

# =============================================================================
# PUBLIC FUNCTIONS
# =============================================================================

[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_resolutions( fpath: str | pathlib.Path, ) -> t.List[int]: """Retrieve available resolutions from a .mcool file. Parameters ---------- fpath : str | pathlib.Path The file path to the multi-resolution cooler file (.mcool). Returns ------- t.List[int] A list of integer resolutions available in the file. Raises ------ FileNotFoundError If the specified `fpath` does not exist. LoaderError If no resolutions can be found in the file. Examples -------- Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Cooler file not found at: {fpath}") with h5py.File(fpath, 'r') as h5_f: if 'resolutions' not in h5_f: raise LoaderError(f"'{fpath}' is not a valid multi-resolution .mcool file.") resolutions = [int(res) for res in h5_f['resolutions'].keys()] if not resolutions: raise LoaderError(f"Could not query any resolutions from {fpath}.") return sorted(resolutions)
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_chrom_infos( fpath: str | pathlib.Path, ) -> t.Dict[str, t.Dict[str, t.Union[str, int]]]: """Extract chromosome information from a cooler file. For .mcool files, it reads information from the lowest resolution (largest number). Parameters ---------- fpath : str | pathlib.Path The file path to the cooler file (.cool or .mcool). Returns ------- t.Dict[str, t.Dict[str, t.Union[str, int]]] A dictionary mapping chromosome names to their details. Examples -------- Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Cooler file not found at: {fpath}") uri = str(fpath) # If it's a multi-resolution file, pick the first resolution to read from if cooler.is_mcool(uri): resolutions = get_resolutions(fpath) uri = f"{uri}::resolutions/{resolutions[0]}" c = cooler.Cooler(uri) return { name: {"name": name, "length": int(length)} for name, length in zip(c.chromnames, c.chromsizes) }
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_bins( fpath: str | pathlib.Path, resolution: int, ) -> pd.DataFrame: """Retrieve the binnified index from a cooler file. Parameters ---------- fpath : str | pathlib.Path Path to the cooler file. resolution : int The resolution to use. Returns ------- pd.DataFrame DataFrame with columns: 'chrom', 'start', 'end'. Examples -------- """ fpath = pathlib.Path(fpath) uri = f"{fpath}::resolutions/{resolution}" c = cooler.Cooler(uri) return c.bins()[['chrom', 'start', 'end']].fetch()
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_balancing_weights( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- resolution: int, region: str, #? --- Options --- weight_name: str = 'weight', ) -> np.ndarray: """Extracts a vector of balancing weights for a given region. Parameters ---------- fpath : str | pathlib.Path File path of the cooler file (.cool or .mcool). resolution : int The resolution to use. region : str The chromosome or region to fetch balancing weights for (e.g., "chr1"). weight_name : str, optional The name of the weight column in the bins table. Defaults to 'weight'. Returns ------- np.ndarray A NumPy array of balancing weights for the specified region. Returns an empty array if the weight column does not exist. Examples -------- Examples -------- """ fpath = pathlib.Path(fpath) uri = f"{fpath}::resolutions/{resolution}" c = cooler.Cooler(uri) bins_df = c.bins().fetch(region) if weight_name in bins_df.columns: return bins_df[weight_name].to_numpy() return np.array([])
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_assembly( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- resolution: int, ) -> str: """Retrieve the genome assembly information from a cooler file. Parameters ---------- fpath : str | pathlib.Path File path of the cooler file. resolution : int Resolution level to query from the file. Returns ------- str The genome assembly name (e.g., "hg38"). Examples -------- Examples -------- """ fpath = pathlib.Path(fpath) uri = f"{fpath}::resolutions/{resolution}" c = cooler.Cooler(uri) return c.info.get('genome-assembly', 'unknown')
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_sparsity( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- resolution: int, region1: str, region2: t.Optional[str] = None, ) -> float: """Calculates the sparsity of a cooler matrix for given regions. Examples -------- Examples -------- """ fpath = pathlib.Path(fpath) if region2 is None: region2 = region1 c = cooler.Cooler(f'{fpath}::/resolutions/{resolution}') matrix_view = c.matrix(balance=False, sparse=True).fetch(region1, region2) num_possible_entries = np.product(matrix_view.shape) if num_possible_entries == 0: return 1.0 # An empty matrix is maximally sparse num_nonzero_entries = len(matrix_view.data) sparsity = 1.0 - (num_nonzero_entries / num_possible_entries) return sparsity
@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def _load_cooler_data( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- resolution: int, region1: str, region2: str | None = None, #? --- Options --- balancing: Balancing | list[Balancing] | None = Balancing.NONE, output_format: DS = DS.DF, return_raw_counts: bool = False, backend: Backend = Backend.COOLER, chunksize: int | None = None, ) -> t.Any: """ Internal function to load cooler data. Supports chunked streaming if chunksize is provided. Parameters ---------- fpath : str | pathlib.Path Path to the cooler file (.cool or .mcool). resolution : int The resolution level to load from the file. region1 : str First genomic region (e.g., "chr1" or "chr1:1,000,000-2,000,000"). region2 : str | None, optional Second genomic region. Defaults to region1. balancing : Balancing | list[Balancing] | None, optional Balancing method(s) to apply. output_format : DS, optional The desired output format. return_raw_counts : bool, optional Whether to return raw counts alongside balanced counts. backend : Backend, optional Select the underlying backend library for loading. chunksize : int | None, optional If provided, returns an iterator that yields chunks of the matrix. Returns ------- t.Any The loaded data or an iterator if chunksize is set. Examples -------- """ if output_format not in [DS.COO, DS.RCV, DS.DF]: raise LoaderError(f"Invalid output_format: {output_format}!") if region2 is None: region2 = region1 # ... [Rest of the logic handles multiple balancing recursively] ... # Handle multiple balancing methods (recursively) if isinstance(balancing, list): results = [] for b in balancing: res = _load_cooler_data( fpath=fpath, resolution=resolution, region1=region1, region2=region2, balancing=b, output_format=output_format, return_raw_counts=False, backend=backend ) results.append(res) if output_format == DS.DF: renamed_dfs = [] for i, df in enumerate(results): b = balancing[i] col_name = f"counts_{b.value.lower()}" if b != Balancing.NONE else "counts_raw" renamed_dfs.append(df.rename(columns={DataFrameSpecs.COUNTS: col_name})) merged_df = renamed_dfs[0] for df in renamed_dfs[1:]: merged_df = pd.merge( merged_df, df, on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS], how='outer' ).fillna(0) # Append raw counts if requested and missing if return_raw_counts: if "counts_raw" not in merged_df.columns: df_raw = _load_cooler_data( fpath=fpath, resolution=resolution, region1=region1, region2=region2, balancing=Balancing.NONE, output_format=DS.DF, backend=backend ) df_raw = df_raw.rename(columns={DataFrameSpecs.COUNTS: "counts_raw"}) merged_df = pd.merge( merged_df, df_raw, on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS], how='outer' ).fillna(0) return merged_df # Tuple return for other formats if return_raw_counts: if Balancing.NONE not in balancing: res_raw = _load_cooler_data( fpath=fpath, resolution=resolution, region1=region1, region2=region2, balancing=Balancing.NONE, output_format=output_format, backend=backend ) results.append(res_raw) return tuple(results) # --- Backend: HICTK --- if backend == Backend.HICTK: try: import hictkpy except ImportError: raise ImportError( "hictkpy is not installed. Please install it to use 'hictk' backend." ) file = hictkpy.File(str(fpath), resolution) balance_arg: t.Union[bool, str] = False if balancing and balancing != Balancing.NONE: balance_arg = balancing.value selector = file.fetch(region1, region2, normalization=balance_arg) #? Implement chunked iteration if chunksize is set if chunksize is not None: # hictkpy provides an iterator over pixels def _hictk_iterator(): """ Function _hictk_iterator. Parameters ---------- Returns ------- Examples -------- Notes ----- """ # We fetch the full selector but iterate in chunks of pixels # To be truly out-of-core, we'd need hictkpy to support chunked fetch. # Currently we'll slice the resulting DF/COO into chunks. full_data = selector.to_df() full_data.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True) for i in range(0, len(full_data), chunksize): yield full_data.iloc[i:i + chunksize] return _hictk_iterator() if output_format == DS.COO: return selector.to_coo() if output_format == DS.DF: df = selector.to_df() df.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True) return df if output_format == DS.RCV: coo = selector.to_coo() return (coo.row, coo.col, coo.data) # --- Backend: COOLER (Standard) --- uri = f"{fpath}::resolutions/{resolution}" c = cooler.Cooler(uri) if chunksize is not None: #? Use cooler's native chunked pixel iteration pixels_selector = c.matrix(balance=balancing.value if balancing and balancing != Balancing.NONE else False, as_pixels=True).fetch(region1, region2) def _cooler_iterator(): """ Function _cooler_iterator. Parameters ---------- Returns ------- Examples -------- Notes ----- """ # pixels_selector is a PixelsSelector which supports iteration # We wrap it to yield standardized DataFrames for chunk in pixels_selector: # Calculate relative IDs offset1 = c.offset(region1) offset2 = c.offset(region2) if region2 else offset1 chunk.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True) chunk[DataFrameSpecs.ROW_IDS] -= offset1 chunk[DataFrameSpecs.COL_IDS] -= offset2 yield chunk return _cooler_iterator() if return_raw_counts: # 1. Fetch RAW data (as pixels DataFrame with absolute IDs) # Note: balance=False returns raw counts matrix_df = c.matrix(balance=False, as_pixels=True).fetch(region1, region2) # 2. Convert to Relative IDs (matching standard cooler.fetch behavior) offset1 = c.offset(region1) if region1 == region2: offset2 = offset1 else: offset2 = c.offset(region2) # Subtract offsets to get 0-based relative indices row_ids_rel = matrix_df['bin1_id'].to_numpy() - offset1 col_ids_rel = matrix_df['bin2_id'].to_numpy() - offset2 raw_counts = matrix_df['count'].to_numpy() # 3. Handle Balancing balanced_counts = raw_counts.copy().astype(float) if balancing and balancing != Balancing.NONE: # Fetch weights manually for the queried regions # These weights arrays are already indexed by relative bin ID (0 to region_len) w1 = c.bins().fetch(region1)[balancing.value].to_numpy() if region1 == region2: w2 = w1 else: w2 = c.bins().fetch(region2)[balancing.value].to_numpy() # Apply weights: count / (w1[row] * w2[col]) # Use relative indices to map to the weights vector val1 = w1[row_ids_rel] val2 = w2[col_ids_rel] denom = val1 * val2 with np.errstate(divide='ignore', invalid='ignore'): balanced_counts = balanced_counts / denom # Filter invalid bins (NaN or Inf results) mask = np.isfinite(balanced_counts) if not mask.all(): row_ids_rel = row_ids_rel[mask] col_ids_rel = col_ids_rel[mask] raw_counts = raw_counts[mask] balanced_counts = balanced_counts[mask] # 4. Return formatted output if output_format == DS.DF: return pd.DataFrame({ DataFrameSpecs.ROW_IDS: row_ids_rel, DataFrameSpecs.COL_IDS: col_ids_rel, DataFrameSpecs.COUNTS: balanced_counts, DataFrameSpecs.RAW_COUNTS: raw_counts }) elif output_format == DS.COO: # Determine shape for COO matrix extent1 = c.extent(region1) len1 = extent1[1] - extent1[0] if region1 == region2: len2 = len1 else: extent2 = c.extent(region2) len2 = extent2[1] - extent2[0] shape = (len1, len2) bal_coo = sp.coo_matrix((balanced_counts, (row_ids_rel, col_ids_rel)), shape=shape) raw_coo = sp.coo_matrix((raw_counts, (row_ids_rel, col_ids_rel)), shape=shape) return (bal_coo, raw_coo) elif output_format == DS.RCV: return ( (row_ids_rel, col_ids_rel, balanced_counts), (row_ids_rel, col_ids_rel, raw_counts) ) # ... Legacy path for return_raw_counts=False ... balance_arg: t.Union[bool, str] = False if balancing and balancing != Balancing.NONE: balance_arg = balancing.value matrix_coo = c.matrix(balance=balance_arg, sparse=True).fetch(region1, region2) if output_format == DS.COO: return matrix_coo row_ids = matrix_coo.row col_ids = matrix_coo.col counts = matrix_coo.data mask = np.logical_and(~np.isnan(counts), counts != 0) if not mask.all(): row_ids, col_ids, counts = row_ids[mask], col_ids[mask], counts[mask] if output_format == DS.RCV: return (row_ids, col_ids, counts) return pd.DataFrame({ DataFrameSpecs.ROW_IDS: row_ids, DataFrameSpecs.COL_IDS: col_ids, DataFrameSpecs.COUNTS: counts, })
[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def load_cooler( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- resolution: int, region1: str, region2: str | None = None, #? --- Options --- balancing: Balancing | list[Balancing] | None = Balancing.NONE, output_format: DS = DS.DF, return_raw_counts: bool = False, backend: Backend = Backend.COOLER, chunksize: int | None = None, ) -> ContactMatrix: """Load contact matrix data from a cooler file lazily. Parameters ---------- fpath : str | pathlib.Path Path to the cooler file (.cool or .mcool). resolution : int The resolution level to load from the file. region1 : str First genomic region (e.g., "chr1" or "chr1:1,000,000-2,000,000"). balancing : Balancing, optional Balancing method to apply. Defaults to `Balancing.NONE`. region2 : str, optional Second genomic region. Defaults to `region1` for intra-chromosomal. output_format : DS, optional The desired output format (`DS.DF`, `DS.RCV`, or `DS.COO`). Defaults to `DS.DF`. return_raw_counts : bool, optional Whether to return raw counts alongside balanced counts. Only supported for output_format=DS.DF. Defaults to False. backend : Backend, optional Select the underlying backend library for loading. Options: 'cooler' (default), 'hictk'. chunksize : int | None, optional If provided, the data will be loaded in chunks. Returns ------- ContactMatrix A ContactMatrix object that can be used to load the data on demand. Examples -------- """ loader_kwargs = { "fpath": fpath, "resolution": resolution, "region1": region1, "region2": region2, "balancing": balancing, "output_format": output_format, "return_raw_counts": return_raw_counts, "backend": backend, "chunksize": chunksize, } return ContactMatrix( chromosome1=region1.split(":")[0], chromosome2=region2.split(":")[0] if region2 else region1.split(":")[0], resolution=resolution, loader_func=_load_cooler_data, loader_kwargs=loader_kwargs, metadata={"format": "cooler"} )