Source code for gunz_cm.preprocs.mirrors

# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"

import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from numba import njit
from scipy import sparse as sp
from .. import consts as cm_consts

@njit(cache=True)
def _fast_mirror_arrays(
    row_ids: np.ndarray,
    col_ids: np.ndarray,
    data: np.ndarray,
    remove_diag: bool,
    double_diag: bool,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Numba-accelerated helper to compute mirrored arrays for a symmetric matrix.


Examples
--------
"""
    n = len(row_ids)

    # First pass: count elements to allocate memory accurately
    upper_count = 0
    diag_count = 0
    for i in range(n):
        r = row_ids[i]
        c = col_ids[i]
        if r < c:
            upper_count += 1
        elif r == c:
            diag_count += 1

    # Calculate output size
    out_size = 2 * upper_count
    if not remove_diag:
        if double_diag:
            out_size += 2 * diag_count
        else:
            out_size += diag_count

    # Allocate output arrays
    out_rows = np.empty(out_size, dtype=row_ids.dtype)
    out_cols = np.empty(out_size, dtype=col_ids.dtype)
    out_data = np.empty(out_size, dtype=data.dtype)

    # Second pass: fill arrays
    idx = 0
    for i in range(n):
        r = row_ids[i]
        c = col_ids[i]
        d = data[i]

        if r < c:
            # Add original upper triangle element
            out_rows[idx] = r
            out_cols[idx] = c
            out_data[idx] = d
            idx += 1

            # Add mirrored lower triangle element
            out_rows[idx] = c
            out_cols[idx] = r
            out_data[idx] = d
            idx += 1
        elif r == c and not remove_diag:
            # Add diagonal element
            out_rows[idx] = r
            out_cols[idx] = c
            out_data[idx] = d
            idx += 1

            if double_diag:
                # Add duplicate diagonal element
                out_rows[idx] = r
                out_cols[idx] = c
                out_data[idx] = d
                idx += 1

    return out_rows, out_cols, out_data

[docs] @functools.singledispatch def mirror_upper_to_lower_triangle( mat: t.Union[pd.DataFrame, sp.coo_matrix], remove_diag: bool = False, double_diag: bool = False, ) -> t.Union[pd.DataFrame, sp.coo_matrix]: """ Mirror the upper triangle part to the lower triangle part of a matrix. Parameters ---------- mat : t.Union[pd.DataFrame, sp.coo_matrix] Input matrix. remove_diag : bool, optional Whether to remove the main diagonal (default is False). double_diag : bool, optional Whether to double the diagonal entries (default is False). This is useful for preserving behavior of certain legacy implementations that sum (i, j) and (j, i) blindly even when i=j. Ignored if remove_diag is True. Returns ------- output_data : t.Union[pd.DataFrame, sparse.coo_matrix] Resulting matrix with the upper triangle mirrored to the lower triangle. Notes ----- This function assumes the input matrix is a symmetric matrix. It delegates the operation to registered implementations based on the input type. Examples -------- Examples -------- """ raise PreprocError(f"Input data must be a pandas DataFrame or a scipy sparse matrix. Got {type(mat)}")
[docs] def mirror_upper_to_lower_triangle_coo( cm_coo: sp.coo_matrix, remove_diag: bool = False, double_diag: bool = False, ) -> sp.coo_matrix: """ Mirror the upper triangle part to the lower triangle part of a sparse matrix. Notes: ------ This function assumes the input matrix is a symmetric matrix. Parameters: ---------- coo : scipy.sparse.coo_matrix The input sparse matrix. remove_diag : bool, optional Whether to remove the main diagonal. Defaults to False. double_diag : bool, optional Whether to double the diagonal entries. Defaults to False. Returns: ------- out_mat : scipy.sparse.coo_matrix The resulting sparse matrix with the upper triangle mirrored to the lower triangle. Examples: -------- Examples -------- """ if not sp.issparse(cm_coo) or not isinstance(cm_coo, sp.coo_matrix): #? Mandate strict type checking to prevent incorrect data processing raise PreprocError(f"Input must be a scipy.sparse.coo_matrix. Got {type(cm_coo)}") #? Get the row and column indices of the input matrix row_ids = cm_coo.row col_ids = cm_coo.col data = cm_coo.data #? Fast Numba array looping and copying out_rows, out_cols, out_data = _fast_mirror_arrays( row_ids, col_ids, data, remove_diag, double_diag ) out_mat = sp.coo_matrix( (out_data, (out_rows, out_cols)), shape=cm_coo.shape ) return out_mat
[docs] def mirror_upper_to_lower_triangle_df( cm_df: pd.DataFrame, remove_diag: bool = False, double_diag: bool = False, row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS, vals_colname: str = cm_consts.DataFrameSpecs.COUNTS, ) -> pd.DataFrame: """ Mirror the upper triangle part to the lower triangle part of a matrix. Note: ---- This function assumes the input matrix is a symmetric matrix. Parameters ---------- cm_df : pandas.DataFrame The input DataFrame representing the matrix. remove_diag : bool, optional Whether to remove the main diagonal (default is False). double_diag : bool, optional Whether to double the diagonal entries (default is False). Returns ------- output_df : pandas.DataFrame The resulting DataFrame with the upper triangle mirrored to the lower triangle. Notes ----- The function uses the consts module for column names. Examples -------- Examples -------- """ upper_triu_mask = cm_df[row_ids_colname] < cm_df[col_ids_colname] upper_triu_df = cm_df.loc[upper_triu_mask, :] lower_triu_df = pd.DataFrame() lower_triu_df[row_ids_colname] = upper_triu_df[col_ids_colname] lower_triu_df[col_ids_colname] = upper_triu_df[row_ids_colname] lower_triu_df[vals_colname] = upper_triu_df[vals_colname] if remove_diag: return pd.concat([upper_triu_df, lower_triu_df]) else: diag_mask = cm_df[row_ids_colname] == cm_df[col_ids_colname] diag_df = cm_df.loc[diag_mask, :] if double_diag: return pd.concat([upper_triu_df, lower_triu_df, diag_df, diag_df]) return pd.concat([upper_triu_df, lower_triu_df, diag_df])
[docs] def symmetrize_edges( rows: np.ndarray, cols: np.ndarray, data: np.ndarray, shape: t.Tuple[int, int], double_diag: bool = False, ) -> sp.coo_matrix: """ Construct a symmetric COO matrix from directed edge arrays. This function is more efficient than `mirror_upper_to_lower_triangle` for constructing matrices from raw edge lists because it skips the intermediate sparse matrix creation and filtering steps. Parameters ---------- rows : np.ndarray Row indices. cols : np.ndarray Column indices. data : np.ndarray Values. shape : tuple[int, int] Shape of the resulting matrix. double_diag : bool, optional Whether to include diagonal elements (i, i) twice (once as (i, i) and once as mirrored (i, i)). This preserves legacy behavior of blindly summing (i, j) and (j, i). Defaults to False. Returns ------- sp.coo_matrix The symmetric sparse matrix. Examples -------- """ if not ( isinstance(rows, np.ndarray) and isinstance(cols, np.ndarray) and isinstance(data, np.ndarray) ): raise PreprocError("rows, cols, and data must be numpy arrays.") if not (len(rows) == len(cols) == len(data)): raise PreprocError("rows, cols, and data must have the same length.") if double_diag: # Simple concatenation doubles diagonal entries rows_sym = np.concatenate([rows, cols]) cols_sym = np.concatenate([cols, rows]) data_sym = np.concatenate([data, data]) else: # Filter off-diagonal elements for the mirrored part mask = rows != cols rows_off = rows[mask] cols_off = cols[mask] data_off = data[mask] rows_sym = np.concatenate([rows, cols_off]) cols_sym = np.concatenate([cols, rows_off]) data_sym = np.concatenate([data, data_off]) return sp.coo_matrix((data_sym, (rows_sym, cols_sym)), shape=shape)
# Register implementations mirror_upper_to_lower_triangle.register(sp.coo_matrix, mirror_upper_to_lower_triangle_coo) mirror_upper_to_lower_triangle.register(pd.DataFrame, mirror_upper_to_lower_triangle_df)