from __future__ import annotations
# -*- coding: utf-8 -*-
"""
A module for reading and processing .cool and .mcool format files.
This module provides functions to extract metadata, chromosome information,
and contact matrix data from Cooler files, leveraging the `cooler` library.
It is designed to be consistent with the other parsers in this library.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib
import typing as t
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import cooler
import h5py
import numpy as np
import pandas as pd
from pydantic import ConfigDict, validate_call
from scipy import sparse as sp
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from ..exceptions import LoaderError
from ..consts import (
DS,
Balancing,
DataFrameSpecs,
Backend,
)
from ..matrix import ContactMatrix
# =============================================================================
# PUBLIC FUNCTIONS
# =============================================================================
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_resolutions(
fpath: str | pathlib.Path,
) -> t.List[int]:
"""Retrieve available resolutions from a .mcool file.
Parameters
----------
fpath : str | pathlib.Path
The file path to the multi-resolution cooler file (.mcool).
Returns
-------
t.List[int]
A list of integer resolutions available in the file.
Raises
------
FileNotFoundError
If the specified `fpath` does not exist.
LoaderError
If no resolutions can be found in the file.
Examples
--------
Examples
--------
"""
fpath = pathlib.Path(fpath)
if not fpath.exists():
raise FileNotFoundError(f"Cooler file not found at: {fpath}")
with h5py.File(fpath, 'r') as h5_f:
if 'resolutions' not in h5_f:
raise LoaderError(f"'{fpath}' is not a valid multi-resolution .mcool file.")
resolutions = [int(res) for res in h5_f['resolutions'].keys()]
if not resolutions:
raise LoaderError(f"Could not query any resolutions from {fpath}.")
return sorted(resolutions)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_chrom_infos(
fpath: str | pathlib.Path,
) -> t.Dict[str, t.Dict[str, t.Union[str, int]]]:
"""Extract chromosome information from a cooler file.
For .mcool files, it reads information from the lowest resolution (largest number).
Parameters
----------
fpath : str | pathlib.Path
The file path to the cooler file (.cool or .mcool).
Returns
-------
t.Dict[str, t.Dict[str, t.Union[str, int]]]
A dictionary mapping chromosome names to their details.
Examples
--------
Examples
--------
"""
fpath = pathlib.Path(fpath)
if not fpath.exists():
raise FileNotFoundError(f"Cooler file not found at: {fpath}")
uri = str(fpath)
# If it's a multi-resolution file, pick the first resolution to read from
if cooler.is_mcool(uri):
resolutions = get_resolutions(fpath)
uri = f"{uri}::resolutions/{resolutions[0]}"
c = cooler.Cooler(uri)
return {
name: {"name": name, "length": int(length)}
for name, length in zip(c.chromnames, c.chromsizes)
}
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_bins(
fpath: str | pathlib.Path,
resolution: int,
) -> pd.DataFrame:
"""Retrieve the binnified index from a cooler file.
Parameters
----------
fpath : str | pathlib.Path
Path to the cooler file.
resolution : int
The resolution to use.
Returns
-------
pd.DataFrame
DataFrame with columns: 'chrom', 'start', 'end'.
Examples
--------
"""
fpath = pathlib.Path(fpath)
uri = f"{fpath}::resolutions/{resolution}"
c = cooler.Cooler(uri)
return c.bins()[['chrom', 'start', 'end']].fetch()
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_balancing_weights(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
resolution: int,
region: str,
#? --- Options ---
weight_name: str = 'weight',
) -> np.ndarray:
"""Extracts a vector of balancing weights for a given region.
Parameters
----------
fpath : str | pathlib.Path
File path of the cooler file (.cool or .mcool).
resolution : int
The resolution to use.
region : str
The chromosome or region to fetch balancing weights for (e.g., "chr1").
weight_name : str, optional
The name of the weight column in the bins table. Defaults to 'weight'.
Returns
-------
np.ndarray
A NumPy array of balancing weights for the specified region.
Returns an empty array if the weight column does not exist.
Examples
--------
Examples
--------
"""
fpath = pathlib.Path(fpath)
uri = f"{fpath}::resolutions/{resolution}"
c = cooler.Cooler(uri)
bins_df = c.bins().fetch(region)
if weight_name in bins_df.columns:
return bins_df[weight_name].to_numpy()
return np.array([])
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_assembly(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
resolution: int,
) -> str:
"""Retrieve the genome assembly information from a cooler file.
Parameters
----------
fpath : str | pathlib.Path
File path of the cooler file.
resolution : int
Resolution level to query from the file.
Returns
-------
str
The genome assembly name (e.g., "hg38").
Examples
--------
Examples
--------
"""
fpath = pathlib.Path(fpath)
uri = f"{fpath}::resolutions/{resolution}"
c = cooler.Cooler(uri)
return c.info.get('genome-assembly', 'unknown')
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_sparsity(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
resolution: int,
region1: str,
region2: t.Optional[str] = None,
) -> float:
"""Calculates the sparsity of a cooler matrix for given regions.
Examples
--------
Examples
--------
"""
fpath = pathlib.Path(fpath)
if region2 is None:
region2 = region1
c = cooler.Cooler(f'{fpath}::/resolutions/{resolution}')
matrix_view = c.matrix(balance=False, sparse=True).fetch(region1, region2)
num_possible_entries = np.product(matrix_view.shape)
if num_possible_entries == 0:
return 1.0 # An empty matrix is maximally sparse
num_nonzero_entries = len(matrix_view.data)
sparsity = 1.0 - (num_nonzero_entries / num_possible_entries)
return sparsity
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def _load_cooler_data(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
resolution: int,
region1: str,
region2: str | None = None,
#? --- Options ---
balancing: Balancing | list[Balancing] | None = Balancing.NONE,
output_format: DS = DS.DF,
return_raw_counts: bool = False,
backend: Backend = Backend.COOLER,
chunksize: int | None = None,
) -> t.Any:
"""
Internal function to load cooler data. Supports chunked streaming if chunksize is provided.
Parameters
----------
fpath : str | pathlib.Path
Path to the cooler file (.cool or .mcool).
resolution : int
The resolution level to load from the file.
region1 : str
First genomic region (e.g., "chr1" or "chr1:1,000,000-2,000,000").
region2 : str | None, optional
Second genomic region. Defaults to region1.
balancing : Balancing | list[Balancing] | None, optional
Balancing method(s) to apply.
output_format : DS, optional
The desired output format.
return_raw_counts : bool, optional
Whether to return raw counts alongside balanced counts.
backend : Backend, optional
Select the underlying backend library for loading.
chunksize : int | None, optional
If provided, returns an iterator that yields chunks of the matrix.
Returns
-------
t.Any
The loaded data or an iterator if chunksize is set.
Examples
--------
"""
if output_format not in [DS.COO, DS.RCV, DS.DF]:
raise LoaderError(f"Invalid output_format: {output_format}!")
if region2 is None:
region2 = region1
# ... [Rest of the logic handles multiple balancing recursively] ...
# Handle multiple balancing methods (recursively)
if isinstance(balancing, list):
results = []
for b in balancing:
res = _load_cooler_data(
fpath=fpath,
resolution=resolution,
region1=region1,
region2=region2,
balancing=b,
output_format=output_format,
return_raw_counts=False,
backend=backend
)
results.append(res)
if output_format == DS.DF:
renamed_dfs = []
for i, df in enumerate(results):
b = balancing[i]
col_name = f"counts_{b.value.lower()}" if b != Balancing.NONE else "counts_raw"
renamed_dfs.append(df.rename(columns={DataFrameSpecs.COUNTS: col_name}))
merged_df = renamed_dfs[0]
for df in renamed_dfs[1:]:
merged_df = pd.merge(
merged_df, df,
on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS],
how='outer'
).fillna(0)
# Append raw counts if requested and missing
if return_raw_counts:
if "counts_raw" not in merged_df.columns:
df_raw = _load_cooler_data(
fpath=fpath,
resolution=resolution,
region1=region1,
region2=region2,
balancing=Balancing.NONE,
output_format=DS.DF,
backend=backend
)
df_raw = df_raw.rename(columns={DataFrameSpecs.COUNTS: "counts_raw"})
merged_df = pd.merge(
merged_df, df_raw,
on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS],
how='outer'
).fillna(0)
return merged_df
# Tuple return for other formats
if return_raw_counts:
if Balancing.NONE not in balancing:
res_raw = _load_cooler_data(
fpath=fpath,
resolution=resolution,
region1=region1,
region2=region2,
balancing=Balancing.NONE,
output_format=output_format,
backend=backend
)
results.append(res_raw)
return tuple(results)
# --- Backend: HICTK ---
if backend == Backend.HICTK:
try:
import hictkpy
except ImportError:
raise ImportError(
"hictkpy is not installed. Please install it to use 'hictk' backend."
)
file = hictkpy.File(str(fpath), resolution)
balance_arg: t.Union[bool, str] = False
if balancing and balancing != Balancing.NONE:
balance_arg = balancing.value
selector = file.fetch(region1, region2, normalization=balance_arg)
#? Implement chunked iteration if chunksize is set
if chunksize is not None:
# hictkpy provides an iterator over pixels
def _hictk_iterator():
"""
Function _hictk_iterator.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
# We fetch the full selector but iterate in chunks of pixels
# To be truly out-of-core, we'd need hictkpy to support chunked fetch.
# Currently we'll slice the resulting DF/COO into chunks.
full_data = selector.to_df()
full_data.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True)
for i in range(0, len(full_data), chunksize):
yield full_data.iloc[i:i + chunksize]
return _hictk_iterator()
if output_format == DS.COO:
return selector.to_coo()
if output_format == DS.DF:
df = selector.to_df()
df.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True)
return df
if output_format == DS.RCV:
coo = selector.to_coo()
return (coo.row, coo.col, coo.data)
# --- Backend: COOLER (Standard) ---
uri = f"{fpath}::resolutions/{resolution}"
c = cooler.Cooler(uri)
if chunksize is not None:
#? Use cooler's native chunked pixel iteration
pixels_selector = c.matrix(balance=balancing.value if balancing and balancing != Balancing.NONE else False, as_pixels=True).fetch(region1, region2)
def _cooler_iterator():
"""
Function _cooler_iterator.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
# pixels_selector is a PixelsSelector which supports iteration
# We wrap it to yield standardized DataFrames
for chunk in pixels_selector:
# Calculate relative IDs
offset1 = c.offset(region1)
offset2 = c.offset(region2) if region2 else offset1
chunk.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True)
chunk[DataFrameSpecs.ROW_IDS] -= offset1
chunk[DataFrameSpecs.COL_IDS] -= offset2
yield chunk
return _cooler_iterator()
if return_raw_counts:
# 1. Fetch RAW data (as pixels DataFrame with absolute IDs)
# Note: balance=False returns raw counts
matrix_df = c.matrix(balance=False, as_pixels=True).fetch(region1, region2)
# 2. Convert to Relative IDs (matching standard cooler.fetch behavior)
offset1 = c.offset(region1)
if region1 == region2:
offset2 = offset1
else:
offset2 = c.offset(region2)
# Subtract offsets to get 0-based relative indices
row_ids_rel = matrix_df['bin1_id'].to_numpy() - offset1
col_ids_rel = matrix_df['bin2_id'].to_numpy() - offset2
raw_counts = matrix_df['count'].to_numpy()
# 3. Handle Balancing
balanced_counts = raw_counts.copy().astype(float)
if balancing and balancing != Balancing.NONE:
# Fetch weights manually for the queried regions
# These weights arrays are already indexed by relative bin ID (0 to region_len)
w1 = c.bins().fetch(region1)[balancing.value].to_numpy()
if region1 == region2:
w2 = w1
else:
w2 = c.bins().fetch(region2)[balancing.value].to_numpy()
# Apply weights: count / (w1[row] * w2[col])
# Use relative indices to map to the weights vector
val1 = w1[row_ids_rel]
val2 = w2[col_ids_rel]
denom = val1 * val2
with np.errstate(divide='ignore', invalid='ignore'):
balanced_counts = balanced_counts / denom
# Filter invalid bins (NaN or Inf results)
mask = np.isfinite(balanced_counts)
if not mask.all():
row_ids_rel = row_ids_rel[mask]
col_ids_rel = col_ids_rel[mask]
raw_counts = raw_counts[mask]
balanced_counts = balanced_counts[mask]
# 4. Return formatted output
if output_format == DS.DF:
return pd.DataFrame({
DataFrameSpecs.ROW_IDS: row_ids_rel,
DataFrameSpecs.COL_IDS: col_ids_rel,
DataFrameSpecs.COUNTS: balanced_counts,
DataFrameSpecs.RAW_COUNTS: raw_counts
})
elif output_format == DS.COO:
# Determine shape for COO matrix
extent1 = c.extent(region1)
len1 = extent1[1] - extent1[0]
if region1 == region2:
len2 = len1
else:
extent2 = c.extent(region2)
len2 = extent2[1] - extent2[0]
shape = (len1, len2)
bal_coo = sp.coo_matrix((balanced_counts, (row_ids_rel, col_ids_rel)), shape=shape)
raw_coo = sp.coo_matrix((raw_counts, (row_ids_rel, col_ids_rel)), shape=shape)
return (bal_coo, raw_coo)
elif output_format == DS.RCV:
return (
(row_ids_rel, col_ids_rel, balanced_counts),
(row_ids_rel, col_ids_rel, raw_counts)
)
# ... Legacy path for return_raw_counts=False ...
balance_arg: t.Union[bool, str] = False
if balancing and balancing != Balancing.NONE:
balance_arg = balancing.value
matrix_coo = c.matrix(balance=balance_arg, sparse=True).fetch(region1, region2)
if output_format == DS.COO:
return matrix_coo
row_ids = matrix_coo.row
col_ids = matrix_coo.col
counts = matrix_coo.data
mask = np.logical_and(~np.isnan(counts), counts != 0)
if not mask.all():
row_ids, col_ids, counts = row_ids[mask], col_ids[mask], counts[mask]
if output_format == DS.RCV:
return (row_ids, col_ids, counts)
return pd.DataFrame({
DataFrameSpecs.ROW_IDS: row_ids,
DataFrameSpecs.COL_IDS: col_ids,
DataFrameSpecs.COUNTS: counts,
})
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def load_cooler(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
resolution: int,
region1: str,
region2: str | None = None,
#? --- Options ---
balancing: Balancing | list[Balancing] | None = Balancing.NONE,
output_format: DS = DS.DF,
return_raw_counts: bool = False,
backend: Backend = Backend.COOLER,
chunksize: int | None = None,
) -> ContactMatrix:
"""Load contact matrix data from a cooler file lazily.
Parameters
----------
fpath : str | pathlib.Path
Path to the cooler file (.cool or .mcool).
resolution : int
The resolution level to load from the file.
region1 : str
First genomic region (e.g., "chr1" or "chr1:1,000,000-2,000,000").
balancing : Balancing, optional
Balancing method to apply. Defaults to `Balancing.NONE`.
region2 : str, optional
Second genomic region. Defaults to `region1` for intra-chromosomal.
output_format : DS, optional
The desired output format (`DS.DF`, `DS.RCV`, or `DS.COO`). Defaults to `DS.DF`.
return_raw_counts : bool, optional
Whether to return raw counts alongside balanced counts.
Only supported for output_format=DS.DF. Defaults to False.
backend : Backend, optional
Select the underlying backend library for loading.
Options: 'cooler' (default), 'hictk'.
chunksize : int | None, optional
If provided, the data will be loaded in chunks.
Returns
-------
ContactMatrix
A ContactMatrix object that can be used to load the data on demand.
Examples
--------
"""
loader_kwargs = {
"fpath": fpath,
"resolution": resolution,
"region1": region1,
"region2": region2,
"balancing": balancing,
"output_format": output_format,
"return_raw_counts": return_raw_counts,
"backend": backend,
"chunksize": chunksize,
}
return ContactMatrix(
chromosome1=region1.split(":")[0],
chromosome2=region2.split(":")[0] if region2 else region1.split(":")[0],
resolution=resolution,
loader_func=_load_cooler_data,
loader_kwargs=loader_kwargs,
metadata={"format": "cooler"}
)