Source code for gunz_cm.metrics.ren.hicrep

# -*- coding: utf-8 -*-
"""
Module for computing the HiCRep reproducibility score for Hi-C contact matrices.

This module implements the HiCRep algorithm, a framework for assessing the
reproducibility of Hi-C data. It includes functions for preprocessing,
variance stabilization, and calculating the stratum-adjusted correlation
coefficient (SCC).

References
----------
.. [1] T. Hi, V. G. Rao, and S. S. P. Wingett, "HiCRep: a stratum-adjusted
   correlation coefficient for assessing the reproducibility of Hi-C data,"
   Genome Biology, vol. 18, no. 1, p. 216, 2017.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import typing as t
from contextlib import suppress

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from gunz_cm.exceptions import MetricError
from gunz_cm.utils.logger import logger
import numpy as np
import scipy.sparse as sp

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from gunz_cm import preprocs as cm_preprocs
from gunz_cm.preprocs import create_triu_matrix

#TODO: Move or use from preprocs
[docs] def diag_transform_coo( cm_coo: sp.coo_matrix, ) -> sp.csr_matrix: """ Convert a sparse COO matrix into a sparse CSR matrix of its diagonals. Each row in the output CSR matrix represents a diagonal from the upper triangle of the input matrix. Parameters ---------- cm_coo : sp.coo_matrix Input sparse matrix in COO format. Must be a square matrix. Returns ------- sp.csr_matrix A sparse CSR matrix where each row `i` contains the elements of the `i+1`-th diagonal of the input matrix. Examples -------- """ num_diags = cm_coo.shape[0] diag_ids = cm_coo.col - cm_coo.row mask = (diag_ids > 0) valid_indices = np.where(mask) diagonal_indices = diag_ids[valid_indices] row_indices = diagonal_indices - 1 column_indices = cm_coo.col[valid_indices] - diagonal_indices data = cm_coo.data[valid_indices] csr_matrix = sp.csr_matrix((data, (row_indices, column_indices)), shape=(num_diags - 1, cm_coo.shape[1]), dtype=cm_coo.dtype) csr_matrix.eliminate_zeros() return csr_matrix
#TODO: Move or use from preprocs
[docs] def resample_coo_mat( mat: sp.coo_matrix, target_sum: int ) -> sp.coo_matrix: """ Resample a sparse matrix to a new total sum. This function uses sampling with replacement to adjust the total number of counts in a sparse matrix to a target sum. Parameters ---------- mat : sp.coo_matrix The input matrix to be resampled. target_sum : int The desired total sum of the output matrix. Returns ------- sp.coo_matrix The resampled matrix with the new total sum. Examples -------- """ #? Get the total number of elements in the input matrix num_elements = mat.data.size #? Calculate the probabilities for each element probabilities = mat.data / mat.data.sum() #? Generate random samples with replacement samples = np.random.choice(num_elements, size=target_sum, p=probabilities) #? Count the occurrences of each element in the samples sampled_data = np.bincount(samples, minlength=num_elements) #? Create the resampled matrix resampled_matrix = sp.coo_matrix((sampled_data, (mat.row, mat.col)), shape=mat.shape) #? Remove any zero elements from the resampled matrix resampled_matrix.eliminate_zeros() return resampled_matrix
#TODO: Move or use from preprocs
[docs] def mean_filter_coo_mat( mat: sp.coo_matrix, half_win_size: int ) -> sp.coo_matrix: """ Apply a mean filter to a sparse COO matrix. This function convolves the input with a square kernel of constant entries to perform smoothing. Parameters ---------- mat : sp.coo_matrix The input matrix to be filtered. half_win_size : int The half-size of the filter window (h). The full kernel will be (2*h + 1) x (2*h + 1). Returns ------- sp.coo_matrix The filtered (smoothed) matrix. Raises ------ ValueError If `half_win_size` is not positive or if the input matrix is not square. TypeError If the input matrix is not a SciPy COO matrix. Notes ----- The filter is a square matrix of constant 1s. Edge effects are handled by adjusting the normalization factor based on the number of neighbors for each element. Examples -------- """ #? Validate filter size to prevent invalid memory access or logic errors if half_win_size <= 0: raise MetricError("Half-size of the filter must be greater than 0") #? Ensure input is a valid sparse COO matrix if not (sp.issparse(mat) and mat.getformat() == 'coo'): raise MetricError("Input must be a SciPy COO matrix") #? Ensure matrix is square for valid convolution if mat.shape[0] != mat.shape[1]: raise MetricError("Cannot handle non-square matrix") filter_size = 2 * half_win_size + 1 output_shape = np.array(mat.shape) + filter_size - 1 #? Create a Toeplitz matrix for the filter toeplitz_matrix = sp.diags(np.ones(filter_size), np.arange(-filter_size+1, 1), shape=(output_shape[1], mat.shape[1]), format='csr') #? Perform the convolution result = sp.coo_matrix((toeplitz_matrix @ mat) @ toeplitz_matrix.T) #? Remove the edges result_no_edge = result.tocsr()[half_win_size:(half_win_size+mat.shape[0]), half_win_size:(half_win_size+mat.shape[1])].tocoo() #? Adjust the number of neighbors for each element row_distance_to_edge = np.minimum(result_no_edge.row, result_no_edge.shape[0] - 1 - result_no_edge.row) num_neighbors_dim1 = half_win_size + 1 + np.minimum(row_distance_to_edge, half_win_size) col_distance_to_edge = np.minimum(result_no_edge.col, result_no_edge.shape[1] - 1 - result_no_edge.col) num_neighbors_dim2 = half_win_size + 1 + np.minimum(col_distance_to_edge, half_win_size) num_neighbors = num_neighbors_dim1 * num_neighbors_dim2 result_no_edge.data /= num_neighbors return result_no_edge
[docs] def variance_stabilizing_transform_variance( sample_size: t.Union[int, np.ndarray] ) -> t.Union[int, np.ndarray]: """ Calculate the variance for variance-stabilizing transformation. Parameters ---------- sample_size : int | np.ndarray The size of the input data. Returns ------- int | np.ndarray The variance of the ranked input data, with Bessel's correction. Notes ----- The variance-stabilizing transform turns input data into ranks. The variance of these ranks is a function of only the sample size `n`: `var = (1 + 1/n) / 12` (with Bessel's correction). See [1] for more details. Examples -------- >>> variance_stabilizing_transform_variance(5) 0.1 Examples -------- """ with suppress(ZeroDivisionError), np.errstate(divide='ignore', invalid='ignore'): return np.where(sample_size < 2, np.nan, (1 + 1.0 / sample_size) / 12.0)
def _compute_scc_score( coo_cm1: sp.coo_matrix, coo_cm2: sp.coo_matrix, ) -> float: """ Function _compute_scc_score. Parameters ---------- Returns ------- Examples -------- Notes ----- """ logger.debug(f"Computing SCC score for matrices of shape {coo_cm1.shape}") """ Compute the HiCRep Stratum-Adjusted Correlation Coefficient (SCC). This is a helper function that performs the core SCC calculation. Parameters ---------- coo_cm1 : sp.coo_matrix First preprocessed input contact matrix. coo_cm2 : sp.coo_matrix Second preprocessed input contact matrix. Returns ------- float The HiCRep SCC score. """ #? Convert each diagonal to one row of a csr_matrix to compute diagonal-wise correlation diag_mat_coo1 = diag_transform_coo(coo_cm1) diag_mat_coo2 = diag_transform_coo(coo_cm2) #? Compute sample sizes for each diagonal sample_sizes_diagonal = (diag_mat_coo1 + diag_mat_coo2).getnnz(axis=1) #? Compute row sums for each diagonal row_sums_mat1 = diag_mat_coo1.sum(axis=1).A1 row_sums_mat2 = diag_mat_coo2.sum(axis=1).A1 #? Ignore zero-division warnings because the corresponding elements in the #? output don't contribute to the SCC scores. with np.errstate(divide='ignore', invalid='ignore'): #? Compute covariance for each diagonal covariance_diagonal = diag_mat_coo1.multiply(diag_mat_coo2).sum(axis=1).A1 - row_sums_mat1 * row_sums_mat2 / sample_sizes_diagonal #? Compute correlation coefficient for each diagonal correlation_coefficient_diagonal = covariance_diagonal / np.sqrt( (diag_mat_coo1.power(2).sum(axis=1).A1 - np.square(row_sums_mat1) / sample_sizes_diagonal) * (diag_mat_coo2.power(2).sum(axis=1).A1 - np.square(row_sums_mat2) / sample_sizes_diagonal) ) #? Compute variance stabilizing weights for each diagonal weights_diagonal = sample_sizes_diagonal * variance_stabilizing_transform_variance( sample_sizes_diagonal ) #? Replace NaN and Inf values with zeros weights_diagonal_nan_to_zero = np.nan_to_num(weights_diagonal, copy=True, posinf=0.0, neginf=0.0) correlation_coefficient_diagonal_nan_to_zero = np.nan_to_num(correlation_coefficient_diagonal, copy=True, posinf=0.0, neginf=0.0) #? Compute final SCC score scc_score = correlation_coefficient_diagonal_nan_to_zero @ weights_diagonal_nan_to_zero / weights_diagonal_nan_to_zero.sum() output_scc_score = np.nan_to_num(scc_score, copy=True, posinf=0.0, neginf=0.0) return output_scc_score
[docs] def preprocess_matrices_coo( cm_coo1: sp.coo_matrix, cm_coo2: sp.coo_matrix, max_k: t.Optional[int] = None, remove_main_diag: bool = True, downsample: bool = False, half_win_size: t.Optional[int] = None, ) -> t.Tuple[sp.coo_matrix, sp.coo_matrix]: """ Function preprocess_matrices_coo. Parameters ---------- Returns ------- Examples -------- Notes ----- """ logger.debug(f"Preprocessing matrices: max_k={max_k}, half_win_size={half_win_size}") """ Preprocess contact matrices for HiCRep SCC score computation. The preprocessing steps include: 1. Trimming diagonals to save computation time. 2. Down-sampling matrices if necessary. 3. Normalizing matrices by total number of contacts. 4. Applying smoothing if necessary. Parameters ---------- cm_coo1 : sp.coo_matrix First contact matrix in COO format. cm_coo2 : sp.coo_matrix Second contact matrix in COO format. max_k : int, optional The maximum genomic distance (in bins) to consider. Diagonals beyond this distance will be excluded. If None, the entire matrix is used. remove_main_diag : bool, optional Whether to remove the main diagonal, by default True. downsample : bool, optional Whether to down-sample the matrices to have the same total count, by default False. half_win_size : int, optional The half-size of the smoothing window. If None, no smoothing is applied. Returns ------- tuple[sp.coo_matrix, sp.coo_matrix] The preprocessed contact matrices. Raises ------ ValueError If `half_win_size` is invalid. """ if not remove_main_diag: raise NotImplementedError("Not yet implemented for the next step") if half_win_size is not None: #? Validate half_win_size type and value if not (isinstance(half_win_size, int) and half_win_size > 0): raise MetricError("half_win_size must be a positive integer!") #? Trim diagonals to save computation time trimmed_cm1_coo = create_triu_matrix( cm_coo1, max_k=max_k, remove_main_diag=remove_main_diag ) trimmed_cm2_coo = create_triu_matrix( cm_coo2, max_k=max_k, remove_main_diag=remove_main_diag ) #? Downsample matrices if necessary if downsample: #TODO: Test the functionality with downsample raise NotImplementedError("Functionality is not yet tested") size1 = trimmed_cm1_coo.sum() size2 = trimmed_cm2_coo.sum() if size1 > size2: trimmed_cm1_coo = resample_coo_mat(trimmed_cm1_coo, size2).astype(float) elif size2 > size1: trimmed_cm2_coo = resample_coo_mat(trimmed_cm2_coo, size1).astype(float) else: #? Normalize matrices by total number of contacts total_contacts1 = cm_coo1.sum() total_contacts2 = cm_coo2.sum() trimmed_cm1_coo = trimmed_cm1_coo.astype(float) / total_contacts1 trimmed_cm2_coo = trimmed_cm2_coo.astype(float) / total_contacts2 #? Apply smoothing if necessary if half_win_size is not None: trimmed_cm1_coo = mean_filter_coo_mat(trimmed_cm1_coo, half_win_size) trimmed_cm2_coo = mean_filter_coo_mat(trimmed_cm2_coo, half_win_size) return trimmed_cm1_coo, trimmed_cm2_coo
[docs] def comp_hicrep_coo( cm1_coo: sp.coo_matrix, cm2_coo: sp.coo_matrix, max_k: int = None, remove_main_diag: bool = True, downsample: bool = False, half_win_size: t.Optional[int] = None, ena_common_region: bool = True, ena_reshaping: bool = True, ) -> np.ndarray: """ Function comp_hicrep_coo. Parameters ---------- Returns ------- Examples -------- Notes ----- """ logger.info(f"Computing HiCRep score with half_win_size={half_win_size}, max_k={max_k}") """ Compute the HiCRep score for two contact matrices. This is the main entry point for calculating the HiCRep score between two sparse contact matrices. Parameters ---------- cm1_coo : sp.coo_matrix First contact matrix in COO format. cm2_coo : sp.coo_matrix Second contact matrix in COO format. max_k : int, optional The maximum genomic distance (in bins) to consider. remove_main_diag : bool, optional Whether to exclude the main diagonal, by default True. downsample : bool, optional Whether to down-sample the matrices, by default False. half_win_size : int, optional The half-size of the smoothing window. If None, no smoothing. ena_common_region : bool, optional Whether to filter for common non-empty rows/columns, by default True. ena_reshaping : bool, optional Whether to enforce a square shape on matrices, by default True. Returns ------- float The final HiCRep SCC score. Raises ------ TypeError If inputs are not COO matrices. ValueError If inputs are empty, non-square, or have different shapes. Notes ----- Both input matrices must be in `scipy.sparse.coo_matrix` format. """ #? Ensure both inputs are strictly COO matrices if not (isinstance(cm1_coo, sp.coo_matrix) and isinstance(cm2_coo, sp.coo_matrix)): raise MetricError( "Both input matrices must be scipy.sparse.coo_matrix in COO format" ) if ena_common_region: out = cm_preprocs.filter_common_empty_rowcols_coo( cm1_coo, cm2_coo, op="union", is_triu_sym=True, axis=None, ) cm1_coo, cm2_coo = out #? Do reshaping to enforce square shape if ena_reshaping: if ena_common_region: #? with ena_common_region, the matrices is automatically square pass else: cm_coo1_shape = cm_preprocs.infer_mat_shape_coo( cm1_coo, is_triu_sym=True ) cm1_coo = cm1_coo.reshape(cm_coo1_shape) cm_coo2_shape = cm_preprocs.infer_mat_shape_coo( cm2_coo, is_triu_sym=True ) cm2_coo = cm2_coo.reshape(cm_coo2_shape) #? Check input matrices for validity if cm1_coo.size <= 0: raise MetricError("Contact matrix 1 is empty") if cm1_coo.shape[0] != cm1_coo.shape[1]: raise MetricError("Contact matrix 1 is not square") if cm2_coo.size <= 0: raise MetricError("Contact matrix 2 is empty") if cm2_coo.shape[0] != cm2_coo.shape[1]: raise MetricError("Contact matrix 2 is not square") if cm1_coo.shape != cm2_coo.shape: raise MetricError("Contact matrices have different shapes") cm1_coo, cm2_coo = preprocess_matrices_coo( cm1_coo, cm2_coo, max_k=max_k, remove_main_diag=remove_main_diag, downsample=downsample, half_win_size=half_win_size, ) #? Compute SCC matrix score = _compute_scc_score( cm1_coo, cm2_coo ) return score