Source code for gunz_cm.metrics.ren.hic_spector

# -*- coding: utf-8 -*-
"""
Module for computing the HiCSpector reproducibility score.

This module provides functions to calculate the HiCSpector score, a metric
based on the spectral properties of the normalized Laplacian matrix of
Hi-C contact maps.

References
----------
.. [1] Yan, C., et al. (2017). "HiCSpector: a tool for assessing the
   reproducibility of Hi-C data." Bioinformatics, 33(14), 2197-2199.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
from gunz_cm.utils.logger import logger

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from gunz_cm.exceptions import MetricError
import numpy as np
from scipy import sparse as sp
from scipy.sparse.linalg import eigsh

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from gunz_cm.preprocs import (
    comp_single_graph_adj_mat,
    filter_common_empty_rowcols,
    filter_empty_rowcols,
    mirror_upper_to_lower_triangle,
)

[docs] def comp_norm_laplacian_mat_coo( cm_coo: sp.coo_matrix, single_graph_laplacian: bool = False, with_loop: bool = True, is_triu_sym: bool = True, ) -> sp.coo_matrix: """ Calculate the normalized Laplacian matrix from a COO matrix. This function assumes the input matrix is symmetric and that only the upper triangular part (including the diagonal) is stored. Parameters ---------- cm_coo : sp.coo_matrix The input adjacency matrix in COO format. single_graph_laplacian : bool, optional Whether to treat the input as a single graph, which affects how the adjacency matrix is derived. By default False. with_loop : bool, optional Whether to include the diagonal in the adjacency matrix derivation. By default True. is_triu_sym : bool, optional Whether the input is a symmetric matrix stored in upper-triangular form. By default True. Returns ------- sp.coo_matrix The normalized Laplacian matrix in COO format. Examples -------- """ #? Create the adjacency matrix in COO format if single_graph_laplacian: adj_coo = comp_single_graph_adj_mat(cm_coo, with_loop=True) else: if not with_loop: raise NotImplementedError("Not implemented for multi-graph no loop mode!") adj_coo = cm_coo #? Assume that adj_coo stores only the upper triangular part + diagonal if is_triu_sym: # NOTE: adj_coo.sum(0) is equivalent to adj_coo.T.sum(1) but avoids # creating the transposed COO matrix object. node_degrees = adj_coo.sum(1).A1 + adj_coo.sum(0).A1 - adj_coo.diagonal() else: raise NotImplementedError() n = len(node_degrees) D_pow_min_half = 1/np.sqrt(node_degrees) #? Calculate the Normalized Laplacian matrix # Optimization: I - D^-1/2 * A * D^-1/2 # This avoids constructing the sparse degree matrix (degree_coo) and # the unnormalized Laplacian (degree_coo - cm_coo), saving memory and time. # Calculate D^-1/2 * A * D^-1/2 # Reshape vectors for broadcasting: column vector (N, 1) and row vector (1, N) normalized_A = ( cm_coo .multiply(D_pow_min_half.reshape(-1, 1)) .multiply(D_pow_min_half.reshape(1, -1)) ) # Calculate I - normalized_A norm_laplacian = sp.eye(n, format="dia") - normalized_A # Ensure the result is in COO format as expected by callers if not sp.isspmatrix_coo(norm_laplacian): norm_laplacian_coo = norm_laplacian.tocoo() else: norm_laplacian_coo = norm_laplacian return norm_laplacian_coo
[docs] def vec_comp_ipr4(eig_vecs: np.ndarray) -> np.ndarray: """ Compute the Inverse Participation Ratio (IPR) to the 4th power in a vectorized manner. Parameters ---------- eig_vecs : np.ndarray A 2D array of eigenvectors (each row is an eigenvector). Returns ------- np.ndarray The IPR4 values for each eigenvector. Examples -------- """ # Sum over the nodes (axis 1) for each eigenvector return 1.0 / np.sum(eig_vecs ** 4, axis=1)
[docs] def vec_calc_eig_vec_dist(vecs1: np.ndarray, vecs2: np.ndarray) -> np.ndarray: """ Calculate the minimum Euclidean distance between corresponding rows of two matrices of eigenvectors. This function accounts for the directional ambiguity of eigenvectors by computing the distance between `vecs1` and `vecs2`, as well as `vecs1` and `-vecs2`, and returning the minimum of the two for each pair of rows. Parameters ---------- vecs1 : np.ndarray The first set of eigenvectors (2D array, rows are eigenvectors). vecs2 : np.ndarray The second set of eigenvectors (2D array, rows are eigenvectors). Returns ------- np.ndarray The minimum Euclidean distance for each pair of eigenvectors. Examples -------- """ diff_same = vecs1 - vecs2 diff_opp = vecs1 + vecs2 dist_same = np.sum(diff_same * diff_same, axis=1) dist_opp = np.sum(diff_opp * diff_opp, axis=1) return np.sqrt(np.minimum(dist_same, dist_opp))
[docs] def comp_hic_spector_coo( coo_cm1: sp.coo_matrix, coo_cm2: sp.coo_matrix, with_loop: bool = True, num_eig_vecs: int = 20, ipr_cutoff: float = 5, #? Additional parameters op:str="union", single_graph_laplacian: bool = False, intersect_valid_eigvecs: bool = False, #? Verbose verbose: bool = False, ) -> float: """ Calculate the HiCSpector reproducibility score between two sparse matrices. Parameters ---------- coo_cm1 : sp.coo_matrix First contact matrix in COO format. coo_cm2 : sp.coo_matrix Second contact matrix in COO format. with_loop : bool, optional If True, include loops in the calculation. By default True. num_eig_vecs : int, optional Number of eigenvectors to consider. By default 20. ipr_cutoff : float, optional Cutoff value for the IPR filter. If None, no filter is applied. By default 5. op : str, optional The operation for filtering common rows/columns ('union' or 'intersection'). By default "union". single_graph_laplacian : bool, optional Flag to treat the matrix as a single graph. By default False. intersect_valid_eigvecs : bool, optional If True, intersect valid eigenvector IDs from both matrices based on the IPR cutoff. By default False. verbose : bool, optional Enable verbose logging output. By default False. Returns ------- float The HiCSpector reproducibility score. Notes ----- This function computes the HiCSpector metric using spectral decomposition. It assumes input matrices are in COO Upper Triangle format. Examples -------- """ if not sp.isspmatrix_coo(coo_cm1): raise MetricError("Input sparse matrix 1 type must be COO!") if not sp.isspmatrix_coo(coo_cm2): raise MetricError("Input sparse matrix 2 type must be COO!") #? Expected COO Uppper Triangle matrix (without main diagonal) if np.any(coo_cm1.row > coo_cm1.col): raise MetricError("Input sparse matrix 1 must be upper triangular!") if np.any(coo_cm2.row > coo_cm2.col): raise MetricError("Input sparse matrix 2 must be upper triangular!") if not np.issubdtype(coo_cm1.dtype, np.integer): raise MetricError("Input sparse matrix 1 dtype must be integer!") if not np.issubdtype(coo_cm2.dtype, np.integer): raise MetricError("Input sparse matrix 2 dtype must be integer!") # coo_cm1 = coo_cm1.astype(int) # coo_cm2 = coo_cm2.astype(int) #? Find union out = filter_common_empty_rowcols( coo_cm1, coo_cm2, op=op, ret_unique_ids=False, ) union_filtered_coo1, union_filtered_coo2 = out #? Because of union, there are some additional empty regions filtered_coo1, unique_ids1 = filter_empty_rowcols( union_filtered_coo1, ret_unique_ids=True ) filtered_coo2, unique_ids2 = filter_empty_rowcols( union_filtered_coo2, ret_unique_ids=True ) #? Calculate the Laplacian matrices norm_lap_coo1 = comp_norm_laplacian_mat_coo( filtered_coo1, single_graph_laplacian=single_graph_laplacian, with_loop=with_loop ) norm_lap_coo2 = comp_norm_laplacian_mat_coo( filtered_coo2, single_graph_laplacian=single_graph_laplacian, with_loop=with_loop ) if verbose is True: logger.opt(lazy=True).debug("union_filtered_coo1.shape: {v}", v=lambda: union_filtered_coo1.shape) logger.opt(lazy=True).debug("union_filtered_coo2.shape: {v}", v=lambda: union_filtered_coo2.shape) logger.opt(lazy=True).debug("coo_cm1: {v}", v=lambda: coo_cm1) logger.opt(lazy=True).debug("coo_cm2: {v}", v=lambda: coo_cm2) logger.opt(lazy=True).debug("filtered_coo1: {v}", v=lambda: filtered_coo1) logger.opt(lazy=True).debug("filtered_coo2: {v}", v=lambda: filtered_coo2) logger.opt(lazy=True).debug("norm_lap_coo1: {v}", v=lambda: norm_lap_coo1) logger.opt(lazy=True).debug("norm_lap_coo2: {v}", v=lambda: norm_lap_coo2) #? Mirror the Triu part because the EVD requires full matrix norm_lap_coo1 = mirror_upper_to_lower_triangle(norm_lap_coo1) norm_lap_coo2 = mirror_upper_to_lower_triangle(norm_lap_coo2) #? Calculate the eigenvalues and eigenvectors of the Laplacian matrices #? Ordered by eigenvalues from lowest to greatest _, eig_vecs1 = eigsh(norm_lap_coo1, k=num_eig_vecs, which="SM") _, eig_vecs2 = eigsh(norm_lap_coo2, k=num_eig_vecs, which="SM") #? Extend the eigenvectors if there is a unique unaligned region in coo1 num_union_nodes = union_filtered_coo1.shape[0] if num_union_nodes > norm_lap_coo1.shape[0]: new_eig_vecs1 = np.zeros((num_union_nodes, num_eig_vecs)) new_eig_vecs1[unique_ids1, :] = eig_vecs1 eig_vecs1 = new_eig_vecs1 #? Extend the eigenvectors if there is a unique unaligned region in coo2 if num_union_nodes > len(unique_ids2): new_eig_vecs2 = np.zeros((num_union_nodes, num_eig_vecs)) new_eig_vecs2[unique_ids2, :] = eig_vecs2 eig_vecs2 = new_eig_vecs2 #? Change to (num_eig_vecs, num_union_nodes) eig_vecs1 = eig_vecs1.T eig_vecs2 = eig_vecs2.T #? Handle IPR Cutoff if ipr_cutoff is not None: ipr_vals1 = vec_comp_ipr4(eig_vecs1) ipr_vals2 = vec_comp_ipr4(eig_vecs2) #? Default behaviour based on the original implementation if intersect_valid_eigvecs: valid_eig_vec1_ids = np.where(ipr_vals1 > ipr_cutoff)[0] valid_eig_vec2_ids = np.where(ipr_vals2 > ipr_cutoff)[0] valid_eig_vec_ids = np.intersect1d( valid_eig_vec1_ids, valid_eig_vec2_ids ) eig_vecs1 = eig_vecs1[valid_eig_vec_ids, :] eig_vecs2 = eig_vecs2[valid_eig_vec_ids, :] else: eig_vecs1 = eig_vecs1[ipr_vals1 > ipr_cutoff, :] eig_vecs2 = eig_vecs2[ipr_vals2 > ipr_cutoff, :] num_ipr_filtered_eig_vecs = np.min( [eig_vecs1.shape[0], eig_vecs2.shape[0]] ) eig_vecs1 = eig_vecs1[:num_ipr_filtered_eig_vecs, :] eig_vecs2 = eig_vecs2[:num_ipr_filtered_eig_vecs, :] num_eig_vecs_used = eig_vecs1.shape[0] #? Compute Euclidean distance of eigenvectors dists = vec_calc_eig_vec_dist(eig_vecs1, eig_vecs2) dist_sum = dists.sum() l = np.sqrt(2) score = np.abs(l - dist_sum / num_eig_vecs_used) / l if verbose is True: logger.opt(lazy=True).debug("num_eig_vecs_used: {v}", v=lambda: num_eig_vecs_used) logger.opt(lazy=True).debug("eig_vecs1: {v}", v=lambda: eig_vecs1) logger.opt(lazy=True).debug("eig_vecs2: {v}", v=lambda: eig_vecs2) logger.opt(lazy=True).debug("dists: {v}", v=lambda: dists) logger.opt(lazy=True).debug("dist_sum: {v}", v=lambda: dist_sum) logger.opt(lazy=True).debug("score: {v}", v=lambda: score) return score