# -*- coding: utf-8 -*-
"""
Module for converting contact matrix data from standard formats (like .hic or
.cool) into a tabular, sparse COO (Coordinate List) text format.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "2.0.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from pydantic import ConfigDict, validate_call
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from .. import loaders
from ..consts import Balancing, DataFrameSpecs, DataStructure
from ..exceptions import ConversionFailedError
from ..utils.logger import logger
# =============================================================================
# CONVERSION FUNCTIONS
# =============================================================================
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def convert_to_cm_coo(
input_fpath: pathlib.Path,
output_fpath: pathlib.Path,
region1: str,
resolution: int,
balancing: Balancing | None,
region2: str | None = None,
overwrite: bool = False,
exist_ok: bool = False,
res_to_one: bool = False,
to_mcoo: bool = False,
gen_pseudo_weights: bool = False,
output_delimiter: str = "\t",
columns_order: list[str] | None = None,
) -> None:
"""
Converts contact matrix data to a COO format and saves it to a file.
This function loads data using the main loader, optionally creating a
"modified COO" (mCOO) format with both raw and normalized counts, and
saves the result to a specified text file.
Parameters
----------
input_fpath : pathlib.Path
Path to the input contact matrix file (e.g., .hic, .cool).
output_fpath : pathlib.Path
Path where the output COO text file will be saved.
region1 : str
The identifier for the first region/chromosome.
resolution : int
The resolution for binning the contact matrix.
balancing : Balancing, optional
The balancing method to apply. Required if `to_mcoo` is True.
region2 : str, optional
The identifier for the second region, if applicable. Defaults to None.
overwrite : bool, optional
If True, overwrite the output file if it exists. Defaults to False.
exist_ok : bool, optional
If True, do nothing if the output file already exists. Defaults to False.
res_to_one : bool, optional
If True, normalize bin coordinates by the resolution. Defaults to False.
to_mcoo : bool, optional
If True, create a modified COO with raw and normalized counts.
Defaults to False.
gen_pseudo_weights : bool, optional
If True, generate a corresponding .weights file. Defaults to False.
output_delimiter : str, optional
The delimiter for the output text file. Defaults to a tab.
columns_order : list[str], optional
The specific order of columns for the output file. Defaults to None.
Raises
------
FileExistsError
If the output file exists and neither `overwrite` nor `exist_ok` is True.
ConverterError
If `to_mcoo` is True but `balancing` is not provided.
Examples
--------
"""
# Ensure output directory exists
output_fpath.parent.mkdir(parents=True, exist_ok=True)
# Handle existing output file
if output_fpath.exists() and output_fpath.stat().st_size > 0:
if overwrite:
pass # Proceed to overwrite
elif exist_ok:
logger.info(f"Output file exists and is not empty. Skipping: {output_fpath}")
return # Skip and do nothing
else:
raise FileExistsError(f"File already exists: {output_fpath}")
if to_mcoo:
if not balancing:
raise ConverterError("Balancing method must be specified when `to_mcoo` is True.")
# Load normalized counts
norm_count_df = loaders.load_cm_data(
fpath=input_fpath,
region1=region1,
resolution=resolution,
balancing=balancing,
region2=region2,
output_format=DataStructure.DF,
).rename(columns={DataFrameSpecs.COUNTS: DataFrameSpecs.NORM_COUNTS})
# Load raw counts
raw_count_df = loaders.load_cm_data(
fpath=input_fpath,
region1=region1,
resolution=resolution,
balancing=None, # No balancing for raw counts
region2=region2,
output_format=DataStructure.DF,
).rename(columns={DataFrameSpecs.COUNTS: DataFrameSpecs.RAW_COUNTS})
# Merge raw and normalized dataframes
df = pd.merge(norm_count_df, raw_count_df, how="left")
else:
# Load standard COO data
df = loaders.load_cm_data(
fpath=input_fpath,
region1=region1,
resolution=resolution,
balancing=balancing,
region2=region2,
output_format=DataStructure.DF,
)
if not res_to_one:
# Scale bin IDs back to genomic coordinates if not normalizing to resolution 1
df[[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS]] *= resolution
if columns_order:
# Ensure all specified columns exist before reordering
missing_cols = [col for col in columns_order if col not in df.columns]
if missing_cols:
raise ConverterError(f"Columns not found in data: {', '.join(missing_cols)}")
df = df[columns_order]
df.to_csv(
output_fpath,
header=False,
index=False,
sep=output_delimiter,
)
if gen_pseudo_weights:
weight_fpath = output_fpath.with_suffix(".weights")
max_pos = max(df[DataFrameSpecs.ROW_IDS].max(), df[DataFrameSpecs.COL_IDS].max())
num_weights = np.ceil(max_pos / resolution).astype(int) + 1
weights_df = pd.DataFrame({"weights": np.ones(num_weights)})
weights_df.to_csv(
weight_fpath,
header=False,
index=False,
sep="\t",
)
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def convert_all_intra_to_cm_coo(
input_fpath: pathlib.Path,
output_dpath: pathlib.Path,
resolution: int,
balancing: Balancing | None,
overwrite: bool = False,
res_to_one: bool = False,
to_mcoo: bool = False,
gen_pseudo_weights: bool = False,
output_delimiter: str = "\t",
columns_order: list[str] | None = None,
n_jobs: int = 1,
) -> None:
"""
Converts all intra-chromosomal matrices in a file to COO format.
This function iterates through all chromosomes found in the input file,
extracts the intra-chromosomal contact matrix for each, and saves it
as a separate COO file in the specified output directory.
Parameters
----------
input_fpath : pathlib.Path
Path to the input contact matrix file.
output_dpath : pathlib.Path
Directory where the output COO files will be saved.
resolution : int
The resolution of the contact matrices.
balancing : Balancing, optional
The balancing method to apply.
overwrite : bool, optional
If True, overwrite existing output files. Defaults to False.
res_to_one : bool, optional
If True, normalize bin coordinates. Defaults to False.
to_mcoo : bool, optional
If True, convert to modified COO format. Defaults to False.
gen_pseudo_weights : bool, optional
If True, generate corresponding .weights files. Defaults to False.
output_delimiter : str, optional
Delimiter for the output files. Defaults to a tab.
columns_order : list[str], optional
The specific order of columns for the output files. Defaults to None.
n_jobs : int, optional
The number of jobs to run in parallel. Defaults to 1.
Examples
--------
"""
# Use the new loader function to get chromosome names
chrom_infos = loaders.get_chrom_infos(str(input_fpath))
chrom_names = list(chrom_infos.keys())
output_dpath.mkdir(parents=True, exist_ok=True)
def process_chrom(chrom_name: str) -> None:
"""
Function process_chrom.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
ext = "mcoo" if to_mcoo else "coo"
output_fname = f"{chrom_name}.{ext}"
output_fpath = output_dpath / output_fname
logger.info(f"Processing chromosome: {chrom_name} -> {output_fpath}")
try:
convert_to_cm_coo(
input_fpath=input_fpath,
output_fpath=output_fpath,
region1=chrom_name,
resolution=resolution,
balancing=balancing,
overwrite=overwrite,
exist_ok=True, # Allow skipping existing files in a batch job
res_to_one=res_to_one,
to_mcoo=to_mcoo,
gen_pseudo_weights=gen_pseudo_weights,
output_delimiter=output_delimiter,
columns_order=columns_order,
)
except Exception as e:
logger.error(f"Failed to process {chrom_name}: {e}")
raise ConversionFailedError(region=chrom_name, message=str(e)) from e
Parallel(n_jobs=n_jobs)(
delayed(process_chrom)(chrom_name) for chrom_name in chrom_names
)