# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__version__ = "1.0.0"
__author__ = "Yeremia Gunawan Adhisantoso"
__license__ = "Clear BSD"
__email__ = "adhisant@tnt.uni-hannover.de"
import typing as t
from pydantic import validate_call
import os
import io
import re
import glob
import numpy as np
import pandas as pd
from scipy import stats
from ... import loaders as cm_loaders
from ..mds.mds_numpy import comp_edm_from_p
from ...converters import convert_to_cm_coo
COO_DELIM = "\t"
PDB_COL_NAMES = [
"ATOM1",
"ATOM2",
"ATOM3",
"ATOM4",
"ATOM_SERIAL_NUMBER",
"X", "Y", "Z",
"A", "B"
]
PDB_USE_COLS = [
# "ATOM_SERIAL_NUMBER",
"X", "Y", "Z",
]
coord_pattern = re.compile(r'[-+]?\d*\.\d+|[-+]?\d+')
POINTS_FFORMAT = ".pdb"
MAPPING_PATTERN = "_coordinate_mapping.txt"
[docs]
@validate_call
def gen_h3dg_coo(
chr_region: str,
resolution: int,
balancing: str,
input_fpath: str,
output_fpath: str,
overwrite: bool = False,
) -> None:
"""
Generates COO format files for H3DG from a contact matrix file.
Notes
-----
- This function generates both raw and normalized COO files.
- The raw counts file is saved with the suffix `.raw`.
- The normalized counts file is saved with the suffix specified by the `balancing` parameter.
Parameters
----------
chr_region : str
The chromosome region to process.
resolution : int
The resolution of the contact matrix.
balancing : str
The balancing method to use for normalization.
input_fpath : str
The file path to the input contact matrix data.
output_fpath : str
The base file path for the output COO files.
overwrite : bool, optional
Whether to overwrite existing files (default is False).
Returns
-------
None
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
#? Generate raw counts
convert_to_cm_coo(
input_fpath,
f"{output_fpath}.raw",
chr_region,
resolution,
"NONE",
output_delimiter=COO_DELIM,
overwrite=overwrite,
)
#? Generate normalized counts
convert_to_cm_coo(
input_fpath,
f"{output_fpath}.{balancing}",
chr_region,
resolution,
balancing,
output_delimiter=COO_DELIM,
overwrite=overwrite,
)
[docs]
@validate_call
def gen_h3dg_domain(
input_fpath: str,
output_fpath: str,
overwrite: bool = False,
only_autosome: bool = False,
) -> None:
"""
Converts an arrowhead file to an H3DG domain file.
Notes
-----
This function reads an input file containing domain information, filters for intra-chromosomal
interactions, and optionally filters for autosomes only. It then writes the processed data
to an output file in H3DG format. If `overwrite` is False and the output file already exists,
the function will raise a FileExistsError.
Parameters
----------
input_fpath : str
The file path to the input arrowhead file.
output_fpath : str
The file path to the output H3DG domain file.
overwrite : bool, optional
If True, allows overwriting the output file if it already exists. Default is False.
only_autosome : bool, optional
If True, filters the domains to include only autosomes. Default is False.
Returns
-------
None
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
#? Check if the output file exists and handle accordingly
if os.path.exists(output_fpath) and not overwrite:
raise FileExistsError(f"File already exists: {output_fpath}")
#? Read the input file
domain_df = pd.read_table(input_fpath)
#? Filter for intra-chromosomal interactions
intra_chrom_mask = domain_df['chr1'] == domain_df['chr2']
domain_df = domain_df[intra_chrom_mask]
#? Select only the necessary columns
domain_df = domain_df[['chr1', 'x1', 'x2']]
#? Optionally filter for autosomes
if only_autosome:
autosome_mask = domain_df['chr1'].str.replace('chr', '').str.isnumeric()
domain_df = domain_df[autosome_mask]
#? Write the processed data to the output file
domain_df.to_csv(output_fpath, sep='\t', header=False, index=False)
raise NotImplementedError("Not finish yet")
[docs]
@validate_call
def find_h3dg_points_fpath(
path: str
) -> str:
"""
Finds the most recent points file in the specified directory.
Notes
-----
This function searches for files with the `.pdb` extension and returns the most recent one.
Parameters
----------
path : str
The directory path to search for points files.
Returns
-------
str
The file path to the most recent points file.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
points_fpath = sorted(glob.glob(f"{path}/*{POINTS_FFORMAT}"))[-1]
return points_fpath
[docs]
@validate_call
def parse_h3dg_points(
points_fpath: str,
parser: str = 'regex',
) -> np.ndarray:
"""
Parses 3D points from a PDB file using a specified parser.
Notes
-----
- The function supports two parsers: 'regex' and 'pandas'.
- The 'regex' parser uses regular expressions to extract coordinates.
- The 'pandas' parser uses pandas to read the file and extract coordinates.
Parameters
----------
points_fpath : str
The file path to the PDB file containing the points.
parser : str, optional
The parser to use ('regex' or 'pandas'), default is 'regex'.
Returns
-------
np.ndarray
A 2D numpy array of shape (N, 3) containing the parsed points.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
if parser == 'regex':
atoms = []
with open(points_fpath, 'r') as f:
for line in f:
if line.startswith('ATOM'):
coords = coord_pattern.findall(line)
atoms.append([float(x) for x in coords[2:2+3]])
num_points = len(atoms)
points = np.empty((num_points, 3))
for i, atom in enumerate(atoms):
points[i, :] = atom
elif parser == 'pandas':
points_lines = []
with open(points_fpath) as f:
for line in f:
if line.startswith('ATOM'):
points_lines.append(line)
f = io.StringIO("".join(points_lines))
points = (
pd.read_csv(
f,
# sep=r"\s+", #? Delimiter is all possible
delim_whitespace=True, #? Delimiter is all possible whitespace
header=None,
names=PDB_COL_NAMES,
usecols=PDB_USE_COLS,
index_col=None,
)
.to_numpy()
)
return points
[docs]
@validate_call
def find_h3dg_mapping_fpath(
path: str
) -> str:
"""
Finds the coordinate mapping file in the specified directory.
Notes
-----
This function searches for files with the `_coordinate_mapping.txt` extension and returns the first one found.
Parameters
----------
path : str
The directory path to search for mapping files.
Returns
-------
str
The file path to the coordinate mapping file.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
mapping_fpath = glob.glob(f"{path}/*{MAPPING_PATTERN}")[-1]
return mapping_fpath
[docs]
@validate_call
def parse_h3dg_mapping(
mapping_fpath: str,
resolution: t.Optional[int] = None,
) -> np.ndarray:
"""
Parses the coordinate mapping file and returns a mapping matrix.
Notes
-----
- If `resolution` is not provided, it is inferred from the differences in the loci.
- The function normalizes the loci by the resolution.
Parameters
----------
mapping_fpath : str
The file path to the coordinate mapping file.
resolution : Optional[int], optional
The resolution to use for normalization, default is None.
Returns
-------
np.ndarray
A 2D numpy array of shape (N, 2) containing the parsed mapping.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
#? This is the mapping of points to loci (result of simulation)
mapping_mat = pd.read_csv(
mapping_fpath,
sep="\t",
header=None,
names=['loci', 'id'],
index_col=None,
).to_numpy()
if resolution is None:
resolution = np.diff(mapping_mat[:, 0]).min()
mapping_mat[:, 0] //= resolution
return mapping_mat
[docs]
@validate_call
def load_h3dg_points(
res_path: str,
points_fpath: t.Optional[str] = None,
parser: str = 'regex',
mapping_fpath: t.Optional[str] = None,
resolution: t.Optional[int] = None,
num_bins: t.Optional[int] = None,
rc_ids: t.Optional[t.List] = None,
def_coor: float = np.nan,
) -> np.ndarray:
"""
Loads and processes 3D points from a PDB file and a coordinate mapping file.
Notes
-----
- If `points_fpath` is not provided, the function finds the most recent points file in `res_path`.
- If `mapping_fpath` is not provided, the function finds the coordinate mapping file in `res_path`.
- The function handles missing data and invalid loci by setting them to `def_coor`.
Parameters
----------
res_path : str
The directory path containing the points and mapping files.
points_fpath : Optional[str], optional
The file path to the PDB file containing the points, default is None.
parser : str, optional
The parser to use ('regex' or 'pandas'), default is 'regex'.
mapping_fpath : Optional[str], optional
The file path to the coordinate mapping file, default is None.
resolution : Optional[int], optional
The resolution to use for normalization, default is None.
num_bins : Optional[int], optional
The number of bins, default is None.
rc_ids : Optional[List]], optional
Row and column IDs to filter valid loci, default is None.
def_coor : float, optional
The default coordinate value for missing or invalid loci, default is np.nan.
Returns
-------
np.ndarray
A 2D numpy array of shape (N, 3) containing the processed points.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
if points_fpath is None:
points_fpath = find_h3dg_points_fpath(res_path)
points = parse_h3dg_points(
points_fpath,
parser=parser,
)
if mapping_fpath is None:
mapping_fpath = find_h3dg_mapping_fpath(res_path)
mapping_mat = parse_h3dg_mapping(
mapping_fpath,
resolution=resolution
)
if num_bins is None:
num_bins = mapping_mat[:, 0].max() + 1
new_points = np.full((num_bins, 3), def_coor)
new_points[mapping_mat[:, 0], :] = points[mapping_mat[:, 1], :]
points = new_points
if rc_ids is not None:
#? Create mapping from row/col ids to points
#? Reason: not all loci are valid, yet the points contains only valid points
row_ids, col_ids = rc_ids
unique_data_ids = np.unique([row_ids, col_ids])
mask = np.zeros(num_bins, dtype=bool)
mask[unique_data_ids] = True
points[~mask, :] = def_coor
return points
[docs]
@validate_call
def comp_h3dg_obj_perf(
region1: str,
resolution: int,
balancing: str,
cm_fpath: str,
points_fpath: str,
mappings_fpath: str,
region2: t.Optional[str] = None,
) -> dict:
"""
Computes the performance metrics (Spearman and Pearson correlation) for
Euclidean distances predicted by Hierarchical 3D Genome.
Notes
-----
This function computes the Spearman correlation between contact counts and Euclidean distances derived from 3D points.
It handles edge cases such as invalid loci and missing data.
Parameters
----------
region1 : str
The chromosome region for the first chromosome.
resolution : int
The resolution of the data.
balancing : str
The balancing method to use.
cm_fpath : str
The file path to the input contact matrix data.
points_fpath : str
The file path to the points data (pdb or xyz).
mappings_fpath : str
The file path to the mappings data.
region2 : Optional[str], optional
The chromosome region for the second chromosome (default is None).
Returns
-------
dict
A dictionary containing the region, correlation, and data ratio.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 Coder 32B - 6.5bpw
Examples
--------
"""
if not os.path.exists(points_fpath):
raise FileNotFoundError(f"Points file {points_fpath} not found.")
count_df = cm_loaders.load_cm_data(
cm_fpath,
region1,
resolution,
balancing=balancing,
region2=region2,
ret_df=True
)
row_ids = count_df[cm_loaders.ROW_IDS_COLNAME].to_numpy()
col_ids = count_df[cm_loaders.COL_IDS_COLNAME].to_numpy()
counts = count_df[cm_loaders.COUNTS_COLNAME].to_numpy()
#? Create mapping from row/col ids to points
#? Reason: not all loci are valid, yet the points contains only valid points
unique_data_ids = np.unique([row_ids, col_ids])
points_lines = []
with open(points_fpath) as f:
for line in f:
if line.startswith('ATOM'):
points_lines.append(line)
f = io.StringIO("".join(points_lines))
points = (
pd.read_csv(
f,
# sep=r"\s+", #? Delimiter is all possible
delim_whitespace=True, #? Delimiter is all possible whitespace
header=None,
names=PDB_COL_NAMES,
usecols=PDB_USE_COLS,
index_col=None,
)
.to_numpy()
)
#? This is the mapping of points to loci (result of simulation)
mapping_df = pd.read_csv(
mappings_fpath,
sep="\t",
header=None,
names=['loci', 'id'],
index_col=None,
).to_numpy()
mapping_df[:, 0] //= resolution
#? Filter out data that got removed during simulation
unique_data_ids_mask = np.isin(unique_data_ids, mapping_df[:, 0])
removed_data_ids = unique_data_ids[~unique_data_ids_mask]
valid_data_mask = ~np.isin(row_ids, removed_data_ids)
valid_data_mask &= ~np.isin(col_ids, removed_data_ids)
row_ids = row_ids[valid_data_mask]
col_ids = col_ids[valid_data_mask]
counts = counts[valid_data_mask]
unique_data_ids = unique_data_ids[unique_data_ids_mask]
mapping = np.searchsorted(unique_data_ids, np.arange(unique_data_ids.max()+1))
new_row_ids = mapping[row_ids]
new_col_ids = mapping[col_ids]
edm = comp_edm_from_p(
points,
new_row_ids,
new_col_ids
)
res = stats.spearmanr(counts, edm)
spearman_r = res.correlation
res = stats.pearsonr(counts, edm)
pearson_r = res.correlation
data_ratio = valid_data_mask.sum() / valid_data_mask.size
output_dict = {
'region': region1,
'spearman_r': spearman_r,
'pearson_r': pearson_r,
'data_ratio': data_ratio,
}
return output_dict