# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"
import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from numba import njit
from scipy import sparse as sp
from .. import consts as cm_consts
@njit(cache=True)
def _fast_mirror_arrays(
row_ids: np.ndarray,
col_ids: np.ndarray,
data: np.ndarray,
remove_diag: bool,
double_diag: bool,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Numba-accelerated helper to compute mirrored arrays for a symmetric matrix.
Examples
--------
"""
n = len(row_ids)
# First pass: count elements to allocate memory accurately
upper_count = 0
diag_count = 0
for i in range(n):
r = row_ids[i]
c = col_ids[i]
if r < c:
upper_count += 1
elif r == c:
diag_count += 1
# Calculate output size
out_size = 2 * upper_count
if not remove_diag:
if double_diag:
out_size += 2 * diag_count
else:
out_size += diag_count
# Allocate output arrays
out_rows = np.empty(out_size, dtype=row_ids.dtype)
out_cols = np.empty(out_size, dtype=col_ids.dtype)
out_data = np.empty(out_size, dtype=data.dtype)
# Second pass: fill arrays
idx = 0
for i in range(n):
r = row_ids[i]
c = col_ids[i]
d = data[i]
if r < c:
# Add original upper triangle element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
# Add mirrored lower triangle element
out_rows[idx] = c
out_cols[idx] = r
out_data[idx] = d
idx += 1
elif r == c and not remove_diag:
# Add diagonal element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
if double_diag:
# Add duplicate diagonal element
out_rows[idx] = r
out_cols[idx] = c
out_data[idx] = d
idx += 1
return out_rows, out_cols, out_data
[docs]
@functools.singledispatch
def mirror_upper_to_lower_triangle(
mat: t.Union[pd.DataFrame, sp.coo_matrix],
remove_diag: bool = False,
double_diag: bool = False,
) -> t.Union[pd.DataFrame, sp.coo_matrix]:
"""
Mirror the upper triangle part to the lower triangle part of a matrix.
Parameters
----------
mat : t.Union[pd.DataFrame, sp.coo_matrix]
Input matrix.
remove_diag : bool, optional
Whether to remove the main diagonal (default is False).
double_diag : bool, optional
Whether to double the diagonal entries (default is False).
This is useful for preserving behavior of certain legacy implementations
that sum (i, j) and (j, i) blindly even when i=j.
Ignored if remove_diag is True.
Returns
-------
output_data : t.Union[pd.DataFrame, sparse.coo_matrix]
Resulting matrix with the upper triangle mirrored to the lower triangle.
Notes
-----
This function assumes the input matrix is a symmetric matrix.
It delegates the operation to registered implementations based on the input type.
Examples
--------
Examples
--------
"""
raise PreprocError(f"Input data must be a pandas DataFrame or a scipy sparse matrix. Got {type(mat)}")
[docs]
def mirror_upper_to_lower_triangle_coo(
cm_coo: sp.coo_matrix,
remove_diag: bool = False,
double_diag: bool = False,
) -> sp.coo_matrix:
"""
Mirror the upper triangle part to the lower triangle part of a sparse matrix.
Notes:
------
This function assumes the input matrix is a symmetric matrix.
Parameters:
----------
coo : scipy.sparse.coo_matrix
The input sparse matrix.
remove_diag : bool, optional
Whether to remove the main diagonal.
Defaults to False.
double_diag : bool, optional
Whether to double the diagonal entries.
Defaults to False.
Returns:
-------
out_mat : scipy.sparse.coo_matrix
The resulting sparse matrix with the upper triangle mirrored to the lower triangle.
Examples:
--------
Examples
--------
"""
if not sp.issparse(cm_coo) or not isinstance(cm_coo, sp.coo_matrix):
#? Mandate strict type checking to prevent incorrect data processing
raise PreprocError(f"Input must be a scipy.sparse.coo_matrix. Got {type(cm_coo)}")
#? Get the row and column indices of the input matrix
row_ids = cm_coo.row
col_ids = cm_coo.col
data = cm_coo.data
#? Fast Numba array looping and copying
out_rows, out_cols, out_data = _fast_mirror_arrays(
row_ids, col_ids, data, remove_diag, double_diag
)
out_mat = sp.coo_matrix(
(out_data, (out_rows, out_cols)),
shape=cm_coo.shape
)
return out_mat
[docs]
def mirror_upper_to_lower_triangle_df(
cm_df: pd.DataFrame,
remove_diag: bool = False,
double_diag: bool = False,
row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS,
vals_colname: str = cm_consts.DataFrameSpecs.COUNTS,
) -> pd.DataFrame:
"""
Mirror the upper triangle part to the lower triangle part of a matrix.
Note:
----
This function assumes the input matrix is a symmetric matrix.
Parameters
----------
cm_df : pandas.DataFrame
The input DataFrame representing the matrix.
remove_diag : bool, optional
Whether to remove the main diagonal (default is False).
double_diag : bool, optional
Whether to double the diagonal entries (default is False).
Returns
-------
output_df : pandas.DataFrame
The resulting DataFrame with the upper triangle mirrored to the lower triangle.
Notes
-----
The function uses the consts module for column names.
Examples
--------
Examples
--------
"""
upper_triu_mask = cm_df[row_ids_colname] < cm_df[col_ids_colname]
upper_triu_df = cm_df.loc[upper_triu_mask, :]
lower_triu_df = pd.DataFrame()
lower_triu_df[row_ids_colname] = upper_triu_df[col_ids_colname]
lower_triu_df[col_ids_colname] = upper_triu_df[row_ids_colname]
lower_triu_df[vals_colname] = upper_triu_df[vals_colname]
if remove_diag:
return pd.concat([upper_triu_df, lower_triu_df])
else:
diag_mask = cm_df[row_ids_colname] == cm_df[col_ids_colname]
diag_df = cm_df.loc[diag_mask, :]
if double_diag:
return pd.concat([upper_triu_df, lower_triu_df, diag_df, diag_df])
return pd.concat([upper_triu_df, lower_triu_df, diag_df])
[docs]
def symmetrize_edges(
rows: np.ndarray,
cols: np.ndarray,
data: np.ndarray,
shape: t.Tuple[int, int],
double_diag: bool = False,
) -> sp.coo_matrix:
"""
Construct a symmetric COO matrix from directed edge arrays.
This function is more efficient than `mirror_upper_to_lower_triangle` for
constructing matrices from raw edge lists because it skips the intermediate
sparse matrix creation and filtering steps.
Parameters
----------
rows : np.ndarray
Row indices.
cols : np.ndarray
Column indices.
data : np.ndarray
Values.
shape : tuple[int, int]
Shape of the resulting matrix.
double_diag : bool, optional
Whether to include diagonal elements (i, i) twice (once as (i, i) and
once as mirrored (i, i)). This preserves legacy behavior of blindly
summing (i, j) and (j, i). Defaults to False.
Returns
-------
sp.coo_matrix
The symmetric sparse matrix.
Examples
--------
"""
if not (
isinstance(rows, np.ndarray)
and isinstance(cols, np.ndarray)
and isinstance(data, np.ndarray)
):
raise PreprocError("rows, cols, and data must be numpy arrays.")
if not (len(rows) == len(cols) == len(data)):
raise PreprocError("rows, cols, and data must have the same length.")
if double_diag:
# Simple concatenation doubles diagonal entries
rows_sym = np.concatenate([rows, cols])
cols_sym = np.concatenate([cols, rows])
data_sym = np.concatenate([data, data])
else:
# Filter off-diagonal elements for the mirrored part
mask = rows != cols
rows_off = rows[mask]
cols_off = cols[mask]
data_off = data[mask]
rows_sym = np.concatenate([rows, cols_off])
cols_sym = np.concatenate([cols, rows_off])
data_sym = np.concatenate([data, data_off])
return sp.coo_matrix((data_sym, (rows_sym, cols_sym)), shape=shape)
# Register implementations
mirror_upper_to_lower_triangle.register(sp.coo_matrix, mirror_upper_to_lower_triangle_coo)
mirror_upper_to_lower_triangle.register(pd.DataFrame, mirror_upper_to_lower_triangle_df)