# -*- coding: utf-8 -*-
"""
Optimized mirror operations for genomic contact matrices.
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.1"
__license__ = "Clear BSD"
__email__ = "adhisant@tnt.uni-hannover.de"
import functools
import typing as t
import numpy as np
import pandas as pd
from scipy import sparse as sp
from numba import njit
from gunz_cm.exceptions import PreprocError
from .. import consts as cm_consts
@njit(cache=True)
def _fast_mirror_arrays(
row_ids: np.ndarray,
col_ids: np.ndarray,
data: np.ndarray,
remove_diag: bool,
double_diag: bool,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Numba-accelerated helper to compute mirrored arrays for a symmetric matrix.
"""
n = len(row_ids)
# First pass: count elements to allocate memory accurately
upper_count = 0
diag_count = 0
for i in range(n):
r = row_ids[i]
c = col_ids[i]
if r < c:
upper_count += 1
elif r == c:
diag_count += 1
# Calculate output size
out_size = 2 * upper_count
if not remove_diag:
if double_diag:
out_size += 2 * diag_count
else:
out_size += diag_count
# Allocate output arrays
out_rows = np.empty(out_size, dtype=row_ids.dtype)
out_cols = np.empty(out_size, dtype=col_ids.dtype)
out_data = np.empty(out_size, dtype=data.dtype)
# Second pass: fill arrays
idx = 0
for i in range(n):
r = row_ids[i]
c = col_ids[i]
d = data[i]
if r < c:
# Add original upper triangle element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
# Add mirrored lower triangle element
out_rows[idx] = c
out_cols[idx] = r
out_data[idx] = d
idx += 1
elif r == c and not remove_diag:
# Add diagonal element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
if double_diag:
# Add duplicate diagonal element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
return out_rows, out_cols, out_data
[docs]
@functools.singledispatch
def mirror_upper_to_lower_triangle(
mat: t.Any,
remove_diag: bool = False,
double_diag: bool = False,
) -> t.Any:
"""
Mirror the upper triangle part to the lower triangle part of a matrix.
Parameters
----------
mat : pandas.DataFrame or scipy.sparse.spmatrix
Input matrix. Supported types are pandas DataFrame and any
scipy sparse matrix.
remove_diag : bool, optional
Whether to remove the main diagonal. Defaults to False.
double_diag : bool, optional
Whether to double the diagonal entries. Defaults to False.
This is useful for preserving behavior of certain legacy
implementations. Ignored if remove_diag is True.
Returns
-------
any
Resulting matrix with the upper triangle mirrored to the lower
triangle, in the same format as input.
Raises
------
PreprocError
If the input type is not supported.
"""
raise PreprocError(f"Input data must be a pandas DataFrame or a scipy sparse matrix. Got {type(mat)}")
[docs]
def mirror_upper_to_lower_triangle_coo(
cm_coo: sp.coo_matrix,
remove_diag: bool = False,
double_diag: bool = False,
) -> sp.coo_matrix:
"""
Implementation of mirror_upper_to_lower_triangle for COO matrices.
Parameters
----------
cm_coo : scipy.sparse.coo_matrix
The input sparse matrix.
remove_diag : bool, optional
Whether to remove the main diagonal. Defaults to False.
double_diag : bool, optional
Whether to double the diagonal entries. Defaults to False.
Returns
-------
scipy.sparse.coo_matrix
The resulting symmetric sparse matrix.
"""
if not isinstance(cm_coo, sp.coo_matrix):
raise PreprocError(f"Input must be a scipy.sparse.coo_matrix. Got {type(cm_coo)}")
#? Get the row and column indices of the input matrix
row_ids = cm_coo.row
col_ids = cm_coo.col
data = cm_coo.data
#? Fast Numba array looping and copying
out_rows, out_cols, out_data = _fast_mirror_arrays(
row_ids, col_ids, data, remove_diag, double_diag
)
out_mat = sp.coo_matrix(
(out_data, (out_rows, out_cols)),
shape=cm_coo.shape
)
return out_mat
[docs]
def mirror_upper_to_lower_triangle_df(
cm_df: pd.DataFrame,
remove_diag: bool = False,
double_diag: bool = False,
row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS,
vals_colname: str = cm_consts.DataFrameSpecs.COUNTS,
) -> pd.DataFrame:
"""
Implementation of mirror_upper_to_lower_triangle for pandas DataFrames.
Parameters
----------
cm_df : pandas.DataFrame
The input DataFrame representing the matrix.
remove_diag : bool, optional
Whether to remove the main diagonal. Defaults to False.
double_diag : bool, optional
Whether to double the diagonal entries. Defaults to False.
row_ids_colname : str, optional
Column name for row IDs. Defaults to 'bin1_id'.
col_ids_colname : str, optional
Column name for column IDs. Defaults to 'bin2_id'.
vals_colname : str, optional
Column name for contact counts. Defaults to 'count'.
Returns
-------
pandas.DataFrame
The resulting symmetric DataFrame.
"""
upper_triu_mask = cm_df[row_ids_colname] < cm_df[col_ids_colname]
upper_triu_df = cm_df.loc[upper_triu_mask, :]
lower_triu_df = pd.DataFrame()
lower_triu_df[row_ids_colname] = upper_triu_df[col_ids_colname]
lower_triu_df[col_ids_colname] = upper_triu_df[row_ids_colname]
lower_triu_df[vals_colname] = upper_triu_df[vals_colname]
if remove_diag:
return pd.concat([upper_triu_df, lower_triu_df])
else:
diag_mask = cm_df[row_ids_colname] == cm_df[col_ids_colname]
diag_df = cm_df.loc[diag_mask, :]
if double_diag:
return pd.concat([upper_triu_df, lower_triu_df, diag_df, diag_df])
return pd.concat([upper_triu_df, lower_triu_df, diag_df])
# Register implementations
mirror_upper_to_lower_triangle.register(sp.coo_matrix, mirror_upper_to_lower_triangle_coo)
mirror_upper_to_lower_triangle.register(sp.csr_matrix, mirror_upper_to_lower_triangle_coo)
mirror_upper_to_lower_triangle.register(pd.DataFrame, mirror_upper_to_lower_triangle_df)
[docs]
def symmetrize_edges(
rows: np.ndarray,
cols: np.ndarray,
data: np.ndarray,
shape: t.Tuple[int, int],
double_diag: bool = False,
) -> sp.coo_matrix:
"""
Construct a symmetric COO matrix from directed edge arrays.
Parameters
----------
rows : np.ndarray
Row indices.
cols : np.ndarray
Column indices.
data : np.ndarray
Values.
shape : tuple of int
Shape of the resulting matrix (rows, cols).
double_diag : bool, optional
Whether to include diagonal elements twice. Defaults to False.
Returns
-------
scipy.sparse.coo_matrix
A symmetric sparse matrix in COO format.
"""
mask = (rows != cols)
if not double_diag:
rows_off = rows[mask]
cols_off = cols[mask]
data_off = data[mask]
rows_sym = np.concatenate([rows, cols_off])
cols_sym = np.concatenate([cols, rows_off])
data_sym = np.concatenate([data, data_off])
else:
# Just swap all and concatenate
rows_sym = np.concatenate([rows, cols])
cols_sym = np.concatenate([cols, rows])
data_sym = np.concatenate([data, data])
return sp.coo_matrix((data_sym, (rows_sym, cols_sym)), shape=shape)