# -*- coding: utf-8 -*-
"""
Module for computing the HiCSpector reproducibility score.
This module provides functions to calculate the HiCSpector score, a metric
based on the spectral properties of the normalized Laplacian matrix of
Hi-C contact maps.
References
----------
.. [1] Yan, C., et al. (2017). "HiCSpector: a tool for assessing the
reproducibility of Hi-C data." Bioinformatics, 33(14), 2197-2199.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
from gunz_cm.utils.logger import logger
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from gunz_cm.exceptions import MetricError
import numpy as np
from scipy import sparse as sp
from scipy.sparse.linalg import eigsh
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from gunz_cm.preprocs import (
comp_single_graph_adj_mat,
filter_common_empty_rowcols,
filter_empty_rowcols,
mirror_upper_to_lower_triangle,
)
[docs]
def comp_norm_laplacian_mat_coo(
cm_coo: sp.coo_matrix,
single_graph_laplacian: bool = False,
with_loop: bool = True,
is_triu_sym: bool = True,
) -> sp.coo_matrix:
"""
Calculate the normalized Laplacian matrix from a COO matrix.
This function assumes the input matrix is symmetric and that only the
upper triangular part (including the diagonal) is stored.
Parameters
----------
cm_coo : sp.coo_matrix
The input adjacency matrix in COO format.
single_graph_laplacian : bool, optional
Whether to treat the input as a single graph, which affects how the
adjacency matrix is derived. By default False.
with_loop : bool, optional
Whether to include the diagonal in the adjacency matrix derivation.
By default True.
is_triu_sym : bool, optional
Whether the input is a symmetric matrix stored in upper-triangular
form. By default True.
Returns
-------
sp.coo_matrix
The normalized Laplacian matrix in COO format.
Examples
--------
"""
#? Create the adjacency matrix in COO format
if single_graph_laplacian:
adj_coo = comp_single_graph_adj_mat(cm_coo, with_loop=True)
else:
if not with_loop:
raise NotImplementedError("Not implemented for multi-graph no loop mode!")
adj_coo = cm_coo
#? Assume that adj_coo stores only the upper triangular part + diagonal
if is_triu_sym:
# NOTE: adj_coo.sum(0) is equivalent to adj_coo.T.sum(1) but avoids
# creating the transposed COO matrix object.
node_degrees = adj_coo.sum(1).A1 + adj_coo.sum(0).A1 - adj_coo.diagonal()
else:
raise NotImplementedError()
n = len(node_degrees)
D_pow_min_half = 1/np.sqrt(node_degrees)
#? Calculate the Normalized Laplacian matrix
# Optimization: I - D^-1/2 * A * D^-1/2
# This avoids constructing the sparse degree matrix (degree_coo) and
# the unnormalized Laplacian (degree_coo - cm_coo), saving memory and time.
# Calculate D^-1/2 * A * D^-1/2
# Reshape vectors for broadcasting: column vector (N, 1) and row vector (1, N)
normalized_A = (
cm_coo
.multiply(D_pow_min_half.reshape(-1, 1))
.multiply(D_pow_min_half.reshape(1, -1))
)
# Calculate I - normalized_A
norm_laplacian = sp.eye(n, format="dia") - normalized_A
# Ensure the result is in COO format as expected by callers
if not sp.isspmatrix_coo(norm_laplacian):
norm_laplacian_coo = norm_laplacian.tocoo()
else:
norm_laplacian_coo = norm_laplacian
return norm_laplacian_coo
[docs]
def vec_comp_ipr4(eig_vecs: np.ndarray) -> np.ndarray:
"""
Compute the Inverse Participation Ratio (IPR) to the 4th power in a vectorized manner.
Parameters
----------
eig_vecs : np.ndarray
A 2D array of eigenvectors (each row is an eigenvector).
Returns
-------
np.ndarray
The IPR4 values for each eigenvector.
Examples
--------
"""
# Sum over the nodes (axis 1) for each eigenvector
return 1.0 / np.sum(eig_vecs ** 4, axis=1)
[docs]
def vec_calc_eig_vec_dist(vecs1: np.ndarray, vecs2: np.ndarray) -> np.ndarray:
"""
Calculate the minimum Euclidean distance between corresponding rows of two matrices of eigenvectors.
This function accounts for the directional ambiguity of eigenvectors by
computing the distance between `vecs1` and `vecs2`, as well as `vecs1` and
`-vecs2`, and returning the minimum of the two for each pair of rows.
Parameters
----------
vecs1 : np.ndarray
The first set of eigenvectors (2D array, rows are eigenvectors).
vecs2 : np.ndarray
The second set of eigenvectors (2D array, rows are eigenvectors).
Returns
-------
np.ndarray
The minimum Euclidean distance for each pair of eigenvectors.
Examples
--------
"""
diff_same = vecs1 - vecs2
diff_opp = vecs1 + vecs2
dist_same = np.sum(diff_same * diff_same, axis=1)
dist_opp = np.sum(diff_opp * diff_opp, axis=1)
return np.sqrt(np.minimum(dist_same, dist_opp))
[docs]
def comp_hic_spector_coo(
coo_cm1: sp.coo_matrix,
coo_cm2: sp.coo_matrix,
with_loop: bool = True,
num_eig_vecs: int = 20,
ipr_cutoff: float = 5,
#? Additional parameters
op:str="union",
single_graph_laplacian: bool = False,
intersect_valid_eigvecs: bool = False,
#? Verbose
verbose: bool = False,
) -> float:
"""
Calculate the HiCSpector reproducibility score between two sparse matrices.
Parameters
----------
coo_cm1 : sp.coo_matrix
First contact matrix in COO format.
coo_cm2 : sp.coo_matrix
Second contact matrix in COO format.
with_loop : bool, optional
If True, include loops in the calculation. By default True.
num_eig_vecs : int, optional
Number of eigenvectors to consider. By default 20.
ipr_cutoff : float, optional
Cutoff value for the IPR filter. If None, no filter is applied.
By default 5.
op : str, optional
The operation for filtering common rows/columns ('union' or 'intersection').
By default "union".
single_graph_laplacian : bool, optional
Flag to treat the matrix as a single graph. By default False.
intersect_valid_eigvecs : bool, optional
If True, intersect valid eigenvector IDs from both matrices based on
the IPR cutoff. By default False.
verbose : bool, optional
Enable verbose logging output. By default False.
Returns
-------
float
The HiCSpector reproducibility score.
Notes
-----
This function computes the HiCSpector metric using spectral decomposition.
It assumes input matrices are in COO Upper Triangle format.
Examples
--------
"""
if not sp.isspmatrix_coo(coo_cm1):
raise MetricError("Input sparse matrix 1 type must be COO!")
if not sp.isspmatrix_coo(coo_cm2):
raise MetricError("Input sparse matrix 2 type must be COO!")
#? Expected COO Uppper Triangle matrix (without main diagonal)
if np.any(coo_cm1.row > coo_cm1.col):
raise MetricError("Input sparse matrix 1 must be upper triangular!")
if np.any(coo_cm2.row > coo_cm2.col):
raise MetricError("Input sparse matrix 2 must be upper triangular!")
if not np.issubdtype(coo_cm1.dtype, np.integer):
raise MetricError("Input sparse matrix 1 dtype must be integer!")
if not np.issubdtype(coo_cm2.dtype, np.integer):
raise MetricError("Input sparse matrix 2 dtype must be integer!")
# coo_cm1 = coo_cm1.astype(int)
# coo_cm2 = coo_cm2.astype(int)
#? Find union
out = filter_common_empty_rowcols(
coo_cm1, coo_cm2,
op=op,
ret_unique_ids=False,
)
union_filtered_coo1, union_filtered_coo2 = out
#? Because of union, there are some additional empty regions
filtered_coo1, unique_ids1 = filter_empty_rowcols(
union_filtered_coo1,
ret_unique_ids=True
)
filtered_coo2, unique_ids2 = filter_empty_rowcols(
union_filtered_coo2,
ret_unique_ids=True
)
#? Calculate the Laplacian matrices
norm_lap_coo1 = comp_norm_laplacian_mat_coo(
filtered_coo1,
single_graph_laplacian=single_graph_laplacian,
with_loop=with_loop
)
norm_lap_coo2 = comp_norm_laplacian_mat_coo(
filtered_coo2,
single_graph_laplacian=single_graph_laplacian,
with_loop=with_loop
)
if verbose is True:
logger.opt(lazy=True).debug("union_filtered_coo1.shape: {v}", v=lambda: union_filtered_coo1.shape)
logger.opt(lazy=True).debug("union_filtered_coo2.shape: {v}", v=lambda: union_filtered_coo2.shape)
logger.opt(lazy=True).debug("coo_cm1: {v}", v=lambda: coo_cm1)
logger.opt(lazy=True).debug("coo_cm2: {v}", v=lambda: coo_cm2)
logger.opt(lazy=True).debug("filtered_coo1: {v}", v=lambda: filtered_coo1)
logger.opt(lazy=True).debug("filtered_coo2: {v}", v=lambda: filtered_coo2)
logger.opt(lazy=True).debug("norm_lap_coo1: {v}", v=lambda: norm_lap_coo1)
logger.opt(lazy=True).debug("norm_lap_coo2: {v}", v=lambda: norm_lap_coo2)
#? Mirror the Triu part because the EVD requires full matrix
norm_lap_coo1 = mirror_upper_to_lower_triangle(norm_lap_coo1)
norm_lap_coo2 = mirror_upper_to_lower_triangle(norm_lap_coo2)
#? Calculate the eigenvalues and eigenvectors of the Laplacian matrices
#? Ordered by eigenvalues from lowest to greatest
_, eig_vecs1 = eigsh(norm_lap_coo1, k=num_eig_vecs, which="SM")
_, eig_vecs2 = eigsh(norm_lap_coo2, k=num_eig_vecs, which="SM")
#? Extend the eigenvectors if there is a unique unaligned region in coo1
num_union_nodes = union_filtered_coo1.shape[0]
if num_union_nodes > norm_lap_coo1.shape[0]:
new_eig_vecs1 = np.zeros((num_union_nodes, num_eig_vecs))
new_eig_vecs1[unique_ids1, :] = eig_vecs1
eig_vecs1 = new_eig_vecs1
#? Extend the eigenvectors if there is a unique unaligned region in coo2
if num_union_nodes > len(unique_ids2):
new_eig_vecs2 = np.zeros((num_union_nodes, num_eig_vecs))
new_eig_vecs2[unique_ids2, :] = eig_vecs2
eig_vecs2 = new_eig_vecs2
#? Change to (num_eig_vecs, num_union_nodes)
eig_vecs1 = eig_vecs1.T
eig_vecs2 = eig_vecs2.T
#? Handle IPR Cutoff
if ipr_cutoff is not None:
ipr_vals1 = vec_comp_ipr4(eig_vecs1)
ipr_vals2 = vec_comp_ipr4(eig_vecs2)
#? Default behaviour based on the original implementation
if intersect_valid_eigvecs:
valid_eig_vec1_ids = np.where(ipr_vals1 > ipr_cutoff)[0]
valid_eig_vec2_ids = np.where(ipr_vals2 > ipr_cutoff)[0]
valid_eig_vec_ids = np.intersect1d(
valid_eig_vec1_ids,
valid_eig_vec2_ids
)
eig_vecs1 = eig_vecs1[valid_eig_vec_ids, :]
eig_vecs2 = eig_vecs2[valid_eig_vec_ids, :]
else:
eig_vecs1 = eig_vecs1[ipr_vals1 > ipr_cutoff, :]
eig_vecs2 = eig_vecs2[ipr_vals2 > ipr_cutoff, :]
num_ipr_filtered_eig_vecs = np.min(
[eig_vecs1.shape[0], eig_vecs2.shape[0]]
)
eig_vecs1 = eig_vecs1[:num_ipr_filtered_eig_vecs, :]
eig_vecs2 = eig_vecs2[:num_ipr_filtered_eig_vecs, :]
num_eig_vecs_used = eig_vecs1.shape[0]
#? Compute Euclidean distance of eigenvectors
dists = vec_calc_eig_vec_dist(eig_vecs1, eig_vecs2)
dist_sum = dists.sum()
l = np.sqrt(2)
score = np.abs(l - dist_sum / num_eig_vecs_used) / l
if verbose is True:
logger.opt(lazy=True).debug("num_eig_vecs_used: {v}", v=lambda: num_eig_vecs_used)
logger.opt(lazy=True).debug("eig_vecs1: {v}", v=lambda: eig_vecs1)
logger.opt(lazy=True).debug("eig_vecs2: {v}", v=lambda: eig_vecs2)
logger.opt(lazy=True).debug("dists: {v}", v=lambda: dists)
logger.opt(lazy=True).debug("dist_sum: {v}", v=lambda: dist_sum)
logger.opt(lazy=True).debug("score: {v}", v=lambda: score)
return score