# -*- coding: utf-8 -*-
"""
Module for computing the HiCRep reproducibility score for Hi-C contact matrices.
This module implements the HiCRep algorithm, a framework for assessing the
reproducibility of Hi-C data. It includes functions for preprocessing,
variance stabilization, and calculating the stratum-adjusted correlation
coefficient (SCC).
References
----------
.. [1] T. Hi, V. G. Rao, and S. S. P. Wingett, "HiCRep: a stratum-adjusted
correlation coefficient for assessing the reproducibility of Hi-C data,"
Genome Biology, vol. 18, no. 1, p. 216, 2017.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import typing as t
from contextlib import suppress
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from gunz_cm.exceptions import MetricError
from gunz_cm.utils.logger import logger
import numpy as np
import scipy.sparse as sp
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from gunz_cm import preprocs as cm_preprocs
from gunz_cm.preprocs import create_triu_matrix
#TODO: Move or use from preprocs
#TODO: Move or use from preprocs
[docs]
def resample_coo_mat(
mat: sp.coo_matrix,
target_sum: int
) -> sp.coo_matrix:
"""
Resample a sparse matrix to a new total sum.
This function uses sampling with replacement to adjust the total number of
counts in a sparse matrix to a target sum.
Parameters
----------
mat : sp.coo_matrix
The input matrix to be resampled.
target_sum : int
The desired total sum of the output matrix.
Returns
-------
sp.coo_matrix
The resampled matrix with the new total sum.
Examples
--------
"""
#? Get the total number of elements in the input matrix
num_elements = mat.data.size
#? Calculate the probabilities for each element
probabilities = mat.data / mat.data.sum()
#? Generate random samples with replacement
samples = np.random.choice(num_elements, size=target_sum, p=probabilities)
#? Count the occurrences of each element in the samples
sampled_data = np.bincount(samples, minlength=num_elements)
#? Create the resampled matrix
resampled_matrix = sp.coo_matrix((sampled_data, (mat.row, mat.col)), shape=mat.shape)
#? Remove any zero elements from the resampled matrix
resampled_matrix.eliminate_zeros()
return resampled_matrix
#TODO: Move or use from preprocs
[docs]
def mean_filter_coo_mat(
mat: sp.coo_matrix,
half_win_size: int
) -> sp.coo_matrix:
"""
Apply a mean filter to a sparse COO matrix.
This function convolves the input with a square kernel of constant entries
to perform smoothing.
Parameters
----------
mat : sp.coo_matrix
The input matrix to be filtered.
half_win_size : int
The half-size of the filter window (h). The full kernel will be
(2*h + 1) x (2*h + 1).
Returns
-------
sp.coo_matrix
The filtered (smoothed) matrix.
Raises
------
ValueError
If `half_win_size` is not positive or if the input matrix is not square.
TypeError
If the input matrix is not a SciPy COO matrix.
Notes
-----
The filter is a square matrix of constant 1s. Edge effects are handled
by adjusting the normalization factor based on the number of neighbors
for each element.
Examples
--------
"""
#? Validate filter size to prevent invalid memory access or logic errors
if half_win_size <= 0:
raise MetricError("Half-size of the filter must be greater than 0")
#? Ensure input is a valid sparse COO matrix
if not (sp.issparse(mat) and mat.getformat() == 'coo'):
raise MetricError("Input must be a SciPy COO matrix")
#? Ensure matrix is square for valid convolution
if mat.shape[0] != mat.shape[1]:
raise MetricError("Cannot handle non-square matrix")
filter_size = 2 * half_win_size + 1
output_shape = np.array(mat.shape) + filter_size - 1
#? Create a Toeplitz matrix for the filter
toeplitz_matrix = sp.diags(np.ones(filter_size), np.arange(-filter_size+1, 1), shape=(output_shape[1], mat.shape[1]), format='csr')
#? Perform the convolution
result = sp.coo_matrix((toeplitz_matrix @ mat) @ toeplitz_matrix.T)
#? Remove the edges
result_no_edge = result.tocsr()[half_win_size:(half_win_size+mat.shape[0]), half_win_size:(half_win_size+mat.shape[1])].tocoo()
#? Adjust the number of neighbors for each element
row_distance_to_edge = np.minimum(result_no_edge.row, result_no_edge.shape[0] - 1 - result_no_edge.row)
num_neighbors_dim1 = half_win_size + 1 + np.minimum(row_distance_to_edge, half_win_size)
col_distance_to_edge = np.minimum(result_no_edge.col, result_no_edge.shape[1] - 1 - result_no_edge.col)
num_neighbors_dim2 = half_win_size + 1 + np.minimum(col_distance_to_edge, half_win_size)
num_neighbors = num_neighbors_dim1 * num_neighbors_dim2
result_no_edge.data /= num_neighbors
return result_no_edge
def _compute_scc_score(
coo_cm1: sp.coo_matrix,
coo_cm2: sp.coo_matrix,
) -> float:
"""
Function _compute_scc_score.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
logger.debug(f"Computing SCC score for matrices of shape {coo_cm1.shape}")
"""
Compute the HiCRep Stratum-Adjusted Correlation Coefficient (SCC).
This is a helper function that performs the core SCC calculation.
Parameters
----------
coo_cm1 : sp.coo_matrix
First preprocessed input contact matrix.
coo_cm2 : sp.coo_matrix
Second preprocessed input contact matrix.
Returns
-------
float
The HiCRep SCC score.
"""
#? Convert each diagonal to one row of a csr_matrix to compute diagonal-wise correlation
diag_mat_coo1 = diag_transform_coo(coo_cm1)
diag_mat_coo2 = diag_transform_coo(coo_cm2)
#? Compute sample sizes for each diagonal
sample_sizes_diagonal = (diag_mat_coo1 + diag_mat_coo2).getnnz(axis=1)
#? Compute row sums for each diagonal
row_sums_mat1 = diag_mat_coo1.sum(axis=1).A1
row_sums_mat2 = diag_mat_coo2.sum(axis=1).A1
#? Ignore zero-division warnings because the corresponding elements in the
#? output don't contribute to the SCC scores.
with np.errstate(divide='ignore', invalid='ignore'):
#? Compute covariance for each diagonal
covariance_diagonal = diag_mat_coo1.multiply(diag_mat_coo2).sum(axis=1).A1 - row_sums_mat1 * row_sums_mat2 / sample_sizes_diagonal
#? Compute correlation coefficient for each diagonal
correlation_coefficient_diagonal = covariance_diagonal / np.sqrt(
(diag_mat_coo1.power(2).sum(axis=1).A1 - np.square(row_sums_mat1) / sample_sizes_diagonal) *
(diag_mat_coo2.power(2).sum(axis=1).A1 - np.square(row_sums_mat2) / sample_sizes_diagonal)
)
#? Compute variance stabilizing weights for each diagonal
weights_diagonal = sample_sizes_diagonal * variance_stabilizing_transform_variance(
sample_sizes_diagonal
)
#? Replace NaN and Inf values with zeros
weights_diagonal_nan_to_zero = np.nan_to_num(weights_diagonal, copy=True, posinf=0.0, neginf=0.0)
correlation_coefficient_diagonal_nan_to_zero = np.nan_to_num(correlation_coefficient_diagonal, copy=True, posinf=0.0, neginf=0.0)
#? Compute final SCC score
scc_score = correlation_coefficient_diagonal_nan_to_zero @ weights_diagonal_nan_to_zero / weights_diagonal_nan_to_zero.sum()
output_scc_score = np.nan_to_num(scc_score, copy=True, posinf=0.0, neginf=0.0)
return output_scc_score
[docs]
def preprocess_matrices_coo(
cm_coo1: sp.coo_matrix,
cm_coo2: sp.coo_matrix,
max_k: t.Optional[int] = None,
remove_main_diag: bool = True,
downsample: bool = False,
half_win_size: t.Optional[int] = None,
) -> t.Tuple[sp.coo_matrix, sp.coo_matrix]:
"""
Function preprocess_matrices_coo.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
logger.debug(f"Preprocessing matrices: max_k={max_k}, half_win_size={half_win_size}")
"""
Preprocess contact matrices for HiCRep SCC score computation.
The preprocessing steps include:
1. Trimming diagonals to save computation time.
2. Down-sampling matrices if necessary.
3. Normalizing matrices by total number of contacts.
4. Applying smoothing if necessary.
Parameters
----------
cm_coo1 : sp.coo_matrix
First contact matrix in COO format.
cm_coo2 : sp.coo_matrix
Second contact matrix in COO format.
max_k : int, optional
The maximum genomic distance (in bins) to consider. Diagonals beyond
this distance will be excluded. If None, the entire matrix is used.
remove_main_diag : bool, optional
Whether to remove the main diagonal, by default True.
downsample : bool, optional
Whether to down-sample the matrices to have the same total count,
by default False.
half_win_size : int, optional
The half-size of the smoothing window. If None, no smoothing is
applied.
Returns
-------
tuple[sp.coo_matrix, sp.coo_matrix]
The preprocessed contact matrices.
Raises
------
ValueError
If `half_win_size` is invalid.
"""
if not remove_main_diag:
raise NotImplementedError("Not yet implemented for the next step")
if half_win_size is not None:
#? Validate half_win_size type and value
if not (isinstance(half_win_size, int) and half_win_size > 0):
raise MetricError("half_win_size must be a positive integer!")
#? Trim diagonals to save computation time
trimmed_cm1_coo = create_triu_matrix(
cm_coo1,
max_k=max_k,
remove_main_diag=remove_main_diag
)
trimmed_cm2_coo = create_triu_matrix(
cm_coo2,
max_k=max_k,
remove_main_diag=remove_main_diag
)
#? Downsample matrices if necessary
if downsample:
#TODO: Test the functionality with downsample
raise NotImplementedError("Functionality is not yet tested")
size1 = trimmed_cm1_coo.sum()
size2 = trimmed_cm2_coo.sum()
if size1 > size2:
trimmed_cm1_coo = resample_coo_mat(trimmed_cm1_coo, size2).astype(float)
elif size2 > size1:
trimmed_cm2_coo = resample_coo_mat(trimmed_cm2_coo, size1).astype(float)
else:
#? Normalize matrices by total number of contacts
total_contacts1 = cm_coo1.sum()
total_contacts2 = cm_coo2.sum()
trimmed_cm1_coo = trimmed_cm1_coo.astype(float) / total_contacts1
trimmed_cm2_coo = trimmed_cm2_coo.astype(float) / total_contacts2
#? Apply smoothing if necessary
if half_win_size is not None:
trimmed_cm1_coo = mean_filter_coo_mat(trimmed_cm1_coo, half_win_size)
trimmed_cm2_coo = mean_filter_coo_mat(trimmed_cm2_coo, half_win_size)
return trimmed_cm1_coo, trimmed_cm2_coo
[docs]
def comp_hicrep_coo(
cm1_coo: sp.coo_matrix,
cm2_coo: sp.coo_matrix,
max_k: int = None,
remove_main_diag: bool = True,
downsample: bool = False,
half_win_size: t.Optional[int] = None,
ena_common_region: bool = True,
ena_reshaping: bool = True,
) -> np.ndarray:
"""
Function comp_hicrep_coo.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
logger.info(f"Computing HiCRep score with half_win_size={half_win_size}, max_k={max_k}")
"""
Compute the HiCRep score for two contact matrices.
This is the main entry point for calculating the HiCRep score between
two sparse contact matrices.
Parameters
----------
cm1_coo : sp.coo_matrix
First contact matrix in COO format.
cm2_coo : sp.coo_matrix
Second contact matrix in COO format.
max_k : int, optional
The maximum genomic distance (in bins) to consider.
remove_main_diag : bool, optional
Whether to exclude the main diagonal, by default True.
downsample : bool, optional
Whether to down-sample the matrices, by default False.
half_win_size : int, optional
The half-size of the smoothing window. If None, no smoothing.
ena_common_region : bool, optional
Whether to filter for common non-empty rows/columns, by default True.
ena_reshaping : bool, optional
Whether to enforce a square shape on matrices, by default True.
Returns
-------
float
The final HiCRep SCC score.
Raises
------
TypeError
If inputs are not COO matrices.
ValueError
If inputs are empty, non-square, or have different shapes.
Notes
-----
Both input matrices must be in `scipy.sparse.coo_matrix` format.
"""
#? Ensure both inputs are strictly COO matrices
if not (isinstance(cm1_coo, sp.coo_matrix) and isinstance(cm2_coo, sp.coo_matrix)):
raise MetricError(
"Both input matrices must be scipy.sparse.coo_matrix in COO format"
)
if ena_common_region:
out = cm_preprocs.filter_common_empty_rowcols_coo(
cm1_coo,
cm2_coo,
op="union",
is_triu_sym=True,
axis=None,
)
cm1_coo, cm2_coo = out
#? Do reshaping to enforce square shape
if ena_reshaping:
if ena_common_region:
#? with ena_common_region, the matrices is automatically square
pass
else:
cm_coo1_shape = cm_preprocs.infer_mat_shape_coo(
cm1_coo,
is_triu_sym=True
)
cm1_coo = cm1_coo.reshape(cm_coo1_shape)
cm_coo2_shape = cm_preprocs.infer_mat_shape_coo(
cm2_coo,
is_triu_sym=True
)
cm2_coo = cm2_coo.reshape(cm_coo2_shape)
#? Check input matrices for validity
if cm1_coo.size <= 0:
raise MetricError("Contact matrix 1 is empty")
if cm1_coo.shape[0] != cm1_coo.shape[1]:
raise MetricError("Contact matrix 1 is not square")
if cm2_coo.size <= 0:
raise MetricError("Contact matrix 2 is empty")
if cm2_coo.shape[0] != cm2_coo.shape[1]:
raise MetricError("Contact matrix 2 is not square")
if cm1_coo.shape != cm2_coo.shape:
raise MetricError("Contact matrices have different shapes")
cm1_coo, cm2_coo = preprocess_matrices_coo(
cm1_coo,
cm2_coo,
max_k=max_k,
remove_main_diag=remove_main_diag,
downsample=downsample,
half_win_size=half_win_size,
)
#? Compute SCC matrix
score = _compute_scc_score(
cm1_coo,
cm2_coo
)
return score