Source code for gunz_cm.preprocs.graphs

# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"

import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts

def _comp_single_graph_adj_mat_helper(
    row_ids: np.ndarray,
    col_ids: np.ndarray,
    allow_loop: bool = True,
    is_triu_sym: bool = True,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Helper function to compute the adjacency matrix from row and column IDs.

    Notes
    -----
    This function operates under the premise that the input matrix is symmetric
    but keeps only the upper triangular part and the diagonal from the matrix for processing.
    If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix.
    If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix,
    indicating no self-loop is encoded.

    Parameters
    ----------
    row_ids : np.ndarray
        The row IDs of the input matrix.
    col_ids : np.ndarray
        The column IDs of the input matrix.
    allow_loop : bool, optional
        Determines if a self-loop should be included in the resulting matrix.
        Default is True.
    is_triu_sym : bool, optional
        Determines if the input matrix is symmetric and only the upper triangular part is used.
        Default is True.

    Returns
    -------
    new_row_ids : np.ndarray
        The row IDs of the adjacency matrix.
    new_col_ids : np.ndarray
        The column IDs of the adjacency matrix.
    new_data : np.ndarray
        The data of the adjacency matrix.

    Examples
    --------




Examples
--------
"""
    
    #? Create a mask to identify upper triangular elements (edges)
    triu_mask = (row_ids < col_ids)
    
    new_row_ids = row_ids[triu_mask]
    new_col_ids = col_ids[triu_mask]
    new_data = np.ones(triu_mask.sum(), dtype=int)

    if allow_loop:
        diag_mask = (row_ids == col_ids)
        
        #? Set diagonal elements to 2 (self-loops) if with_loop is True
        diag_row_ids = row_ids[diag_mask]
        diag_col_ids = col_ids[diag_mask]
        diag_data = np.full(diag_mask.sum(), 2, dtype=int)
        
        new_row_ids = np.concatenate((new_row_ids, diag_row_ids))
        new_col_ids = np.concatenate((new_col_ids, diag_col_ids))
        new_data = np.concatenate((new_data, diag_data))
        
    if not is_triu_sym:
        tril_mask = (row_ids > col_ids)
        assert np.any(tril_mask), \
            "No entry in the lower triangle of matrix!"
        
        tril_row_ids = row_ids[tril_mask]
        tril_col_ids = col_ids[tril_mask]
        tril_data = np.ones(tril_mask.sum(), dtype=int)
        
        new_row_ids = np.concatenate((new_row_ids, tril_row_ids))
        new_col_ids = np.concatenate((new_col_ids, tril_col_ids))
        new_data = np.concatenate((new_data, tril_data))
        
    return new_row_ids, new_col_ids, new_data

[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @functools.singledispatch def comp_single_graph_adj_mat( data: t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame], allow_loop: bool = True, is_triu_sym: bool = True, row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS, counts_colname: str = cm_consts.DataFrameSpecs.COUNTS, ) -> t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame]: """ Compute the adjacency matrix from a given data structure. Notes ----- This function operates under the premise that the input matrix is symmetric but keeps only the upper triangular part and the diagonal from the matrix for processing. If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix. If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix, indicating no self-loop is encoded. Parameters ---------- data : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame] The input data structure. allow_loop : bool, optional Determines if a self-loop should be included in the resulting matrix. Default is True. is_triu_sym : bool, optional Determines if the input matrix is symmetric and only the upper triangular part is used. Default is True. row_ids_colname : str, optional The column name for row IDs in the input DataFrame. Default is cm_consts.ROW_IDS_COLNAME. col_ids_colname : str, optional The column name for column IDs in the input DataFrame. Default is cm_consts.COL_IDS_COLNAME. counts_colname : str, optional The column name for counts in the input DataFrame. Default is cm_consts.COUNTS_COLNAME. Returns ------- adj_matrix : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame] The adjacency matrix. Examples -------- Examples -------- """ raise PreprocError(f"No implementation for data type: {type(data).__name__}")
@comp_single_graph_adj_mat.register(sp.coo_matrix) def _( cm_coo: sp.coo_matrix, allow_loop: bool = True, is_triu_sym: bool = True, **kwargs, ) -> sp.coo_matrix: """ Compute the adjacency matrix from a COO matrix. Notes ----- This function operates under the premise that the input matrix is symmetric but keeps only the upper triangular part and the diagonal from the matrix for processing. If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix. If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix, indicating no self-loop is encoded. Parameters ---------- cm_coo : sp.coo_matrix The sparse matrix in COO format. allow_loop : bool, optional Determines if a self-loop should be included in the resulting matrix. Default is True. is_triu_sym : bool, optional Determines if the input matrix is symmetric and only the upper triangular part is used. Default is True. Returns ------- adj_coo : sp.coo_matrix Adjacency matrix where upper triangular elements are 1 and others are 2. Examples -------- Examples -------- """ #? Get the row IDs, column IDs, and data from the input COO matrix row_ids = cm_coo.row col_ids = cm_coo.col out = _comp_single_graph_adj_mat_helper( row_ids, col_ids, allow_loop=allow_loop, is_triu_sym=is_triu_sym, ) new_row_ids, new_col_ids, new_data = out #? Create the adjacency matrix in COO format adj_coo = sp.coo_matrix( (new_data, (new_row_ids, new_col_ids)), shape=cm_coo.shape ) return adj_coo @comp_single_graph_adj_mat.register(pd.DataFrame) def _( cm_df: pd.DataFrame, allow_loop: bool = True, is_triu_sym: bool = True, row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS, counts_colname: str = cm_consts.DataFrameSpecs.COUNTS, ) -> pd.DataFrame: """ Compute the adjacency matrix from a Pandas DataFrame. Notes ----- This function operates under the premise that the input matrix is symmetric but keeps only the upper triangular part and the diagonal from the matrix for processing. If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix. If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix, indicating no self-loop is encoded. Parameters ---------- cm_df : pd.DataFrame The input DataFrame. allow_loop : bool, optional Determines if a self-loop should be included in the resulting matrix. Default is True. is_triu_sym : bool, optional Determines if the input matrix is symmetric and only the upper triangular part is used. Default is True. row_ids_colname : str, optional The column name for row IDs in the input DataFrame. Default is cm_consts.ROW_IDS_COLNAME. col_ids_colname : str, optional The column name for column IDs in the input DataFrame. Default is cm_consts.COL_IDS_COLNAME. counts_colname : str, optional The column name for counts in the input DataFrame. Default is cm_consts.COUNTS_COLNAME. Returns ------- adj_df : pd.DataFrame Adjacency matrix where upper triangular elements are 1 and others are 2. Examples -------- Examples -------- """ row_ids = cm_df[row_ids_colname].to_numpy() col_ids = cm_df[col_ids_colname].to_numpy() out = _comp_single_graph_adj_mat_helper( row_ids, col_ids, allow_loop=allow_loop, is_triu_sym=is_triu_sym, ) new_row_ids, new_col_ids, new_data = out adj_df = pd.DataFrame({ row_ids_colname: new_row_ids, col_ids_colname: new_col_ids, counts_colname: new_data, }) return adj_df