Source code for gunz_cm.reconstructions.third_party.h3dg

# -*- coding: utf-8 -*-

"""
Module.

Examples
--------
"""
__version__ = "1.0.0"
__author__ = "Yeremia Gunawan Adhisantoso"
__license__ = "Clear BSD"
__email__ = "adhisant@tnt.uni-hannover.de"

import typing as t
from pydantic import validate_call
import os
import io
import re
import glob
import numpy as np
import pandas as pd
from scipy import stats
from ... import loaders as cm_loaders
from ..mds.mds_numpy import comp_edm_from_p
from ...converters import convert_to_cm_coo

COO_DELIM = "\t"

PDB_COL_NAMES = [
    "ATOM1",
    "ATOM2",
    "ATOM3",
    "ATOM4",
    "ATOM_SERIAL_NUMBER",
    "X", "Y", "Z",
    "A", "B"
]
PDB_USE_COLS = [
    # "ATOM_SERIAL_NUMBER",
    "X", "Y", "Z",
]

coord_pattern = re.compile(r'[-+]?\d*\.\d+|[-+]?\d+')

POINTS_FFORMAT = ".pdb"
MAPPING_PATTERN = "_coordinate_mapping.txt"

[docs] @validate_call def gen_h3dg_coo( chr_region: str, resolution: int, balancing: str, input_fpath: str, output_fpath: str, overwrite: bool = False, ) -> None: """ Generates COO format files for H3DG from a contact matrix file. Notes ----- - This function generates both raw and normalized COO files. - The raw counts file is saved with the suffix `.raw`. - The normalized counts file is saved with the suffix specified by the `balancing` parameter. Parameters ---------- chr_region : str The chromosome region to process. resolution : int The resolution of the contact matrix. balancing : str The balancing method to use for normalization. input_fpath : str The file path to the input contact matrix data. output_fpath : str The base file path for the output COO files. overwrite : bool, optional Whether to overwrite existing files (default is False). Returns ------- None Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ #? Generate raw counts convert_to_cm_coo( input_fpath, f"{output_fpath}.raw", chr_region, resolution, "NONE", output_delimiter=COO_DELIM, overwrite=overwrite, ) #? Generate normalized counts convert_to_cm_coo( input_fpath, f"{output_fpath}.{balancing}", chr_region, resolution, balancing, output_delimiter=COO_DELIM, overwrite=overwrite, )
[docs] @validate_call def gen_h3dg_domain( input_fpath: str, output_fpath: str, overwrite: bool = False, only_autosome: bool = False, ) -> None: """ Converts an arrowhead file to an H3DG domain file. Notes ----- This function reads an input file containing domain information, filters for intra-chromosomal interactions, and optionally filters for autosomes only. It then writes the processed data to an output file in H3DG format. If `overwrite` is False and the output file already exists, the function will raise a FileExistsError. Parameters ---------- input_fpath : str The file path to the input arrowhead file. output_fpath : str The file path to the output H3DG domain file. overwrite : bool, optional If True, allows overwriting the output file if it already exists. Default is False. only_autosome : bool, optional If True, filters the domains to include only autosomes. Default is False. Returns ------- None Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ #? Check if the output file exists and handle accordingly if os.path.exists(output_fpath) and not overwrite: raise FileExistsError(f"File already exists: {output_fpath}") #? Read the input file domain_df = pd.read_table(input_fpath) #? Filter for intra-chromosomal interactions intra_chrom_mask = domain_df['chr1'] == domain_df['chr2'] domain_df = domain_df[intra_chrom_mask] #? Select only the necessary columns domain_df = domain_df[['chr1', 'x1', 'x2']] #? Optionally filter for autosomes if only_autosome: autosome_mask = domain_df['chr1'].str.replace('chr', '').str.isnumeric() domain_df = domain_df[autosome_mask] #? Write the processed data to the output file domain_df.to_csv(output_fpath, sep='\t', header=False, index=False) raise NotImplementedError("Not finish yet")
[docs] @validate_call def find_h3dg_points_fpath( path: str ) -> str: """ Finds the most recent points file in the specified directory. Notes ----- This function searches for files with the `.pdb` extension and returns the most recent one. Parameters ---------- path : str The directory path to search for points files. Returns ------- str The file path to the most recent points file. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ points_fpath = sorted(glob.glob(f"{path}/*{POINTS_FFORMAT}"))[-1] return points_fpath
[docs] @validate_call def parse_h3dg_points( points_fpath: str, parser: str = 'regex', ) -> np.ndarray: """ Parses 3D points from a PDB file using a specified parser. Notes ----- - The function supports two parsers: 'regex' and 'pandas'. - The 'regex' parser uses regular expressions to extract coordinates. - The 'pandas' parser uses pandas to read the file and extract coordinates. Parameters ---------- points_fpath : str The file path to the PDB file containing the points. parser : str, optional The parser to use ('regex' or 'pandas'), default is 'regex'. Returns ------- np.ndarray A 2D numpy array of shape (N, 3) containing the parsed points. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ if parser == 'regex': atoms = [] with open(points_fpath, 'r') as f: for line in f: if line.startswith('ATOM'): coords = coord_pattern.findall(line) atoms.append([float(x) for x in coords[2:2+3]]) num_points = len(atoms) points = np.empty((num_points, 3)) for i, atom in enumerate(atoms): points[i, :] = atom elif parser == 'pandas': points_lines = [] with open(points_fpath) as f: for line in f: if line.startswith('ATOM'): points_lines.append(line) f = io.StringIO("".join(points_lines)) points = ( pd.read_csv( f, # sep=r"\s+", #? Delimiter is all possible delim_whitespace=True, #? Delimiter is all possible whitespace header=None, names=PDB_COL_NAMES, usecols=PDB_USE_COLS, index_col=None, ) .to_numpy() ) return points
[docs] @validate_call def find_h3dg_mapping_fpath( path: str ) -> str: """ Finds the coordinate mapping file in the specified directory. Notes ----- This function searches for files with the `_coordinate_mapping.txt` extension and returns the first one found. Parameters ---------- path : str The directory path to search for mapping files. Returns ------- str The file path to the coordinate mapping file. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ mapping_fpath = glob.glob(f"{path}/*{MAPPING_PATTERN}")[-1] return mapping_fpath
[docs] @validate_call def parse_h3dg_mapping( mapping_fpath: str, resolution: t.Optional[int] = None, ) -> np.ndarray: """ Parses the coordinate mapping file and returns a mapping matrix. Notes ----- - If `resolution` is not provided, it is inferred from the differences in the loci. - The function normalizes the loci by the resolution. Parameters ---------- mapping_fpath : str The file path to the coordinate mapping file. resolution : Optional[int], optional The resolution to use for normalization, default is None. Returns ------- np.ndarray A 2D numpy array of shape (N, 2) containing the parsed mapping. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ #? This is the mapping of points to loci (result of simulation) mapping_mat = pd.read_csv( mapping_fpath, sep="\t", header=None, names=['loci', 'id'], index_col=None, ).to_numpy() if resolution is None: resolution = np.diff(mapping_mat[:, 0]).min() mapping_mat[:, 0] //= resolution return mapping_mat
[docs] @validate_call def load_h3dg_points( res_path: str, points_fpath: t.Optional[str] = None, parser: str = 'regex', mapping_fpath: t.Optional[str] = None, resolution: t.Optional[int] = None, num_bins: t.Optional[int] = None, rc_ids: t.Optional[t.List] = None, def_coor: float = np.nan, ) -> np.ndarray: """ Loads and processes 3D points from a PDB file and a coordinate mapping file. Notes ----- - If `points_fpath` is not provided, the function finds the most recent points file in `res_path`. - If `mapping_fpath` is not provided, the function finds the coordinate mapping file in `res_path`. - The function handles missing data and invalid loci by setting them to `def_coor`. Parameters ---------- res_path : str The directory path containing the points and mapping files. points_fpath : Optional[str], optional The file path to the PDB file containing the points, default is None. parser : str, optional The parser to use ('regex' or 'pandas'), default is 'regex'. mapping_fpath : Optional[str], optional The file path to the coordinate mapping file, default is None. resolution : Optional[int], optional The resolution to use for normalization, default is None. num_bins : Optional[int], optional The number of bins, default is None. rc_ids : Optional[List]], optional Row and column IDs to filter valid loci, default is None. def_coor : float, optional The default coordinate value for missing or invalid loci, default is np.nan. Returns ------- np.ndarray A 2D numpy array of shape (N, 3) containing the processed points. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ if points_fpath is None: points_fpath = find_h3dg_points_fpath(res_path) points = parse_h3dg_points( points_fpath, parser=parser, ) if mapping_fpath is None: mapping_fpath = find_h3dg_mapping_fpath(res_path) mapping_mat = parse_h3dg_mapping( mapping_fpath, resolution=resolution ) if num_bins is None: num_bins = mapping_mat[:, 0].max() + 1 new_points = np.full((num_bins, 3), def_coor) new_points[mapping_mat[:, 0], :] = points[mapping_mat[:, 1], :] points = new_points if rc_ids is not None: #? Create mapping from row/col ids to points #? Reason: not all loci are valid, yet the points contains only valid points row_ids, col_ids = rc_ids unique_data_ids = np.unique([row_ids, col_ids]) mask = np.zeros(num_bins, dtype=bool) mask[unique_data_ids] = True points[~mask, :] = def_coor return points
[docs] @validate_call def comp_h3dg_obj_perf( region1: str, resolution: int, balancing: str, cm_fpath: str, points_fpath: str, mappings_fpath: str, region2: t.Optional[str] = None, ) -> dict: """ Computes the performance metrics (Spearman and Pearson correlation) for Euclidean distances predicted by Hierarchical 3D Genome. Notes ----- This function computes the Spearman correlation between contact counts and Euclidean distances derived from 3D points. It handles edge cases such as invalid loci and missing data. Parameters ---------- region1 : str The chromosome region for the first chromosome. resolution : int The resolution of the data. balancing : str The balancing method to use. cm_fpath : str The file path to the input contact matrix data. points_fpath : str The file path to the points data (pdb or xyz). mappings_fpath : str The file path to the mappings data. region2 : Optional[str], optional The chromosome region for the second chromosome (default is None). Returns ------- dict A dictionary containing the region, correlation, and data ratio. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 Coder 32B - 6.5bpw Examples -------- """ if not os.path.exists(points_fpath): raise FileNotFoundError(f"Points file {points_fpath} not found.") count_df = cm_loaders.load_cm_data( cm_fpath, region1, resolution, balancing=balancing, region2=region2, ret_df=True ) row_ids = count_df[cm_loaders.ROW_IDS_COLNAME].to_numpy() col_ids = count_df[cm_loaders.COL_IDS_COLNAME].to_numpy() counts = count_df[cm_loaders.COUNTS_COLNAME].to_numpy() #? Create mapping from row/col ids to points #? Reason: not all loci are valid, yet the points contains only valid points unique_data_ids = np.unique([row_ids, col_ids]) points_lines = [] with open(points_fpath) as f: for line in f: if line.startswith('ATOM'): points_lines.append(line) f = io.StringIO("".join(points_lines)) points = ( pd.read_csv( f, # sep=r"\s+", #? Delimiter is all possible delim_whitespace=True, #? Delimiter is all possible whitespace header=None, names=PDB_COL_NAMES, usecols=PDB_USE_COLS, index_col=None, ) .to_numpy() ) #? This is the mapping of points to loci (result of simulation) mapping_df = pd.read_csv( mappings_fpath, sep="\t", header=None, names=['loci', 'id'], index_col=None, ).to_numpy() mapping_df[:, 0] //= resolution #? Filter out data that got removed during simulation unique_data_ids_mask = np.isin(unique_data_ids, mapping_df[:, 0]) removed_data_ids = unique_data_ids[~unique_data_ids_mask] valid_data_mask = ~np.isin(row_ids, removed_data_ids) valid_data_mask &= ~np.isin(col_ids, removed_data_ids) row_ids = row_ids[valid_data_mask] col_ids = col_ids[valid_data_mask] counts = counts[valid_data_mask] unique_data_ids = unique_data_ids[unique_data_ids_mask] mapping = np.searchsorted(unique_data_ids, np.arange(unique_data_ids.max()+1)) new_row_ids = mapping[row_ids] new_col_ids = mapping[col_ids] edm = comp_edm_from_p( points, new_row_ids, new_col_ids ) res = stats.spearmanr(counts, edm) spearman_r = res.correlation res = stats.pearsonr(counts, edm) pearson_r = res.correlation data_ratio = valid_data_mask.sum() / valid_data_mask.size output_dict = { 'region': region1, 'spearman_r': spearman_r, 'pearson_r': pearson_r, 'data_ratio': data_ratio, } return output_dict