Source code for gunz_cm.converters.coo

# -*- coding: utf-8 -*-
"""
Module for converting contact matrix data from standard formats (like .hic or
.cool) into a tabular, sparse COO (Coordinate List) text format.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "2.0.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import pathlib

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from pydantic import ConfigDict, validate_call

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from .. import loaders
from ..consts import Balancing, DataFrameSpecs, DataStructure
from ..exceptions import ConversionFailedError
from ..utils.logger import logger

# =============================================================================
# CONVERSION FUNCTIONS
# =============================================================================

[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def convert_to_cm_coo( input_fpath: pathlib.Path, output_fpath: pathlib.Path, region1: str, resolution: int, balancing: Balancing | None, region2: str | None = None, overwrite: bool = False, exist_ok: bool = False, res_to_one: bool = False, to_mcoo: bool = False, gen_pseudo_weights: bool = False, output_delimiter: str = "\t", columns_order: list[str] | None = None, ) -> None: """ Converts contact matrix data to a COO format and saves it to a file. This function loads data using the main loader, optionally creating a "modified COO" (mCOO) format with both raw and normalized counts, and saves the result to a specified text file. Parameters ---------- input_fpath : pathlib.Path Path to the input contact matrix file (e.g., .hic, .cool). output_fpath : pathlib.Path Path where the output COO text file will be saved. region1 : str The identifier for the first region/chromosome. resolution : int The resolution for binning the contact matrix. balancing : Balancing, optional The balancing method to apply. Required if `to_mcoo` is True. region2 : str, optional The identifier for the second region, if applicable. Defaults to None. overwrite : bool, optional If True, overwrite the output file if it exists. Defaults to False. exist_ok : bool, optional If True, do nothing if the output file already exists. Defaults to False. res_to_one : bool, optional If True, normalize bin coordinates by the resolution. Defaults to False. to_mcoo : bool, optional If True, create a modified COO with raw and normalized counts. Defaults to False. gen_pseudo_weights : bool, optional If True, generate a corresponding .weights file. Defaults to False. output_delimiter : str, optional The delimiter for the output text file. Defaults to a tab. columns_order : list[str], optional The specific order of columns for the output file. Defaults to None. Raises ------ FileExistsError If the output file exists and neither `overwrite` nor `exist_ok` is True. ConverterError If `to_mcoo` is True but `balancing` is not provided. Examples -------- """ # Ensure output directory exists output_fpath.parent.mkdir(parents=True, exist_ok=True) # Handle existing output file if output_fpath.exists() and output_fpath.stat().st_size > 0: if overwrite: pass # Proceed to overwrite elif exist_ok: logger.info(f"Output file exists and is not empty. Skipping: {output_fpath}") return # Skip and do nothing else: raise FileExistsError(f"File already exists: {output_fpath}") if to_mcoo: if not balancing: raise ConverterError("Balancing method must be specified when `to_mcoo` is True.") # Load normalized counts norm_count_df = loaders.load_cm_data( fpath=input_fpath, region1=region1, resolution=resolution, balancing=balancing, region2=region2, output_format=DataStructure.DF, ).rename(columns={DataFrameSpecs.COUNTS: DataFrameSpecs.NORM_COUNTS}) # Load raw counts raw_count_df = loaders.load_cm_data( fpath=input_fpath, region1=region1, resolution=resolution, balancing=None, # No balancing for raw counts region2=region2, output_format=DataStructure.DF, ).rename(columns={DataFrameSpecs.COUNTS: DataFrameSpecs.RAW_COUNTS}) # Merge raw and normalized dataframes df = pd.merge(norm_count_df, raw_count_df, how="left") else: # Load standard COO data df = loaders.load_cm_data( fpath=input_fpath, region1=region1, resolution=resolution, balancing=balancing, region2=region2, output_format=DataStructure.DF, ) if not res_to_one: # Scale bin IDs back to genomic coordinates if not normalizing to resolution 1 df[[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS]] *= resolution if columns_order: # Ensure all specified columns exist before reordering missing_cols = [col for col in columns_order if col not in df.columns] if missing_cols: raise ConverterError(f"Columns not found in data: {', '.join(missing_cols)}") df = df[columns_order] df.to_csv( output_fpath, header=False, index=False, sep=output_delimiter, ) if gen_pseudo_weights: weight_fpath = output_fpath.with_suffix(".weights") max_pos = max(df[DataFrameSpecs.ROW_IDS].max(), df[DataFrameSpecs.COL_IDS].max()) num_weights = np.ceil(max_pos / resolution).astype(int) + 1 weights_df = pd.DataFrame({"weights": np.ones(num_weights)}) weights_df.to_csv( weight_fpath, header=False, index=False, sep="\t", )
[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def convert_all_intra_to_cm_coo( input_fpath: pathlib.Path, output_dpath: pathlib.Path, resolution: int, balancing: Balancing | None, overwrite: bool = False, res_to_one: bool = False, to_mcoo: bool = False, gen_pseudo_weights: bool = False, output_delimiter: str = "\t", columns_order: list[str] | None = None, n_jobs: int = 1, ) -> None: """ Converts all intra-chromosomal matrices in a file to COO format. This function iterates through all chromosomes found in the input file, extracts the intra-chromosomal contact matrix for each, and saves it as a separate COO file in the specified output directory. Parameters ---------- input_fpath : pathlib.Path Path to the input contact matrix file. output_dpath : pathlib.Path Directory where the output COO files will be saved. resolution : int The resolution of the contact matrices. balancing : Balancing, optional The balancing method to apply. overwrite : bool, optional If True, overwrite existing output files. Defaults to False. res_to_one : bool, optional If True, normalize bin coordinates. Defaults to False. to_mcoo : bool, optional If True, convert to modified COO format. Defaults to False. gen_pseudo_weights : bool, optional If True, generate corresponding .weights files. Defaults to False. output_delimiter : str, optional Delimiter for the output files. Defaults to a tab. columns_order : list[str], optional The specific order of columns for the output files. Defaults to None. n_jobs : int, optional The number of jobs to run in parallel. Defaults to 1. Examples -------- """ # Use the new loader function to get chromosome names chrom_infos = loaders.get_chrom_infos(str(input_fpath)) chrom_names = list(chrom_infos.keys()) output_dpath.mkdir(parents=True, exist_ok=True) def process_chrom(chrom_name: str) -> None: """ Function process_chrom. Parameters ---------- Returns ------- Examples -------- Notes ----- """ ext = "mcoo" if to_mcoo else "coo" output_fname = f"{chrom_name}.{ext}" output_fpath = output_dpath / output_fname logger.info(f"Processing chromosome: {chrom_name} -> {output_fpath}") try: convert_to_cm_coo( input_fpath=input_fpath, output_fpath=output_fpath, region1=chrom_name, resolution=resolution, balancing=balancing, overwrite=overwrite, exist_ok=True, # Allow skipping existing files in a batch job res_to_one=res_to_one, to_mcoo=to_mcoo, gen_pseudo_weights=gen_pseudo_weights, output_delimiter=output_delimiter, columns_order=columns_order, ) except Exception as e: logger.error(f"Failed to process {chrom_name}: {e}") raise ConversionFailedError(region=chrom_name, message=str(e)) from e Parallel(n_jobs=n_jobs)( delayed(process_chrom)(chrom_name) for chrom_name in chrom_names )