# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"
import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts
def _comp_single_graph_adj_mat_helper(
row_ids: np.ndarray,
col_ids: np.ndarray,
allow_loop: bool = True,
is_triu_sym: bool = True,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Helper function to compute the adjacency matrix from row and column IDs.
Notes
-----
This function operates under the premise that the input matrix is symmetric
but keeps only the upper triangular part and the diagonal from the matrix for processing.
If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix.
If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix,
indicating no self-loop is encoded.
Parameters
----------
row_ids : np.ndarray
The row IDs of the input matrix.
col_ids : np.ndarray
The column IDs of the input matrix.
allow_loop : bool, optional
Determines if a self-loop should be included in the resulting matrix.
Default is True.
is_triu_sym : bool, optional
Determines if the input matrix is symmetric and only the upper triangular part is used.
Default is True.
Returns
-------
new_row_ids : np.ndarray
The row IDs of the adjacency matrix.
new_col_ids : np.ndarray
The column IDs of the adjacency matrix.
new_data : np.ndarray
The data of the adjacency matrix.
Examples
--------
Examples
--------
"""
#? Create a mask to identify upper triangular elements (edges)
triu_mask = (row_ids < col_ids)
new_row_ids = row_ids[triu_mask]
new_col_ids = col_ids[triu_mask]
new_data = np.ones(triu_mask.sum(), dtype=int)
if allow_loop:
diag_mask = (row_ids == col_ids)
#? Set diagonal elements to 2 (self-loops) if with_loop is True
diag_row_ids = row_ids[diag_mask]
diag_col_ids = col_ids[diag_mask]
diag_data = np.full(diag_mask.sum(), 2, dtype=int)
new_row_ids = np.concatenate((new_row_ids, diag_row_ids))
new_col_ids = np.concatenate((new_col_ids, diag_col_ids))
new_data = np.concatenate((new_data, diag_data))
if not is_triu_sym:
tril_mask = (row_ids > col_ids)
assert np.any(tril_mask), \
"No entry in the lower triangle of matrix!"
tril_row_ids = row_ids[tril_mask]
tril_col_ids = col_ids[tril_mask]
tril_data = np.ones(tril_mask.sum(), dtype=int)
new_row_ids = np.concatenate((new_row_ids, tril_row_ids))
new_col_ids = np.concatenate((new_col_ids, tril_col_ids))
new_data = np.concatenate((new_data, tril_data))
return new_row_ids, new_col_ids, new_data
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
@functools.singledispatch
def comp_single_graph_adj_mat(
data: t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame],
allow_loop: bool = True,
is_triu_sym: bool = True,
row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS,
counts_colname: str = cm_consts.DataFrameSpecs.COUNTS,
) -> t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame]:
"""
Compute the adjacency matrix from a given data structure.
Notes
-----
This function operates under the premise that the input matrix is symmetric
but keeps only the upper triangular part and the diagonal from the matrix for processing.
If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix.
If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix,
indicating no self-loop is encoded.
Parameters
----------
data : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame]
The input data structure.
allow_loop : bool, optional
Determines if a self-loop should be included in the resulting matrix.
Default is True.
is_triu_sym : bool, optional
Determines if the input matrix is symmetric and only the upper triangular part is used.
Default is True.
row_ids_colname : str, optional
The column name for row IDs in the input DataFrame.
Default is cm_consts.ROW_IDS_COLNAME.
col_ids_colname : str, optional
The column name for column IDs in the input DataFrame.
Default is cm_consts.COL_IDS_COLNAME.
counts_colname : str, optional
The column name for counts in the input DataFrame.
Default is cm_consts.COUNTS_COLNAME.
Returns
-------
adj_matrix : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame]
The adjacency matrix.
Examples
--------
Examples
--------
"""
raise PreprocError(f"No implementation for data type: {type(data).__name__}")
@comp_single_graph_adj_mat.register(sp.coo_matrix)
def _(
cm_coo: sp.coo_matrix,
allow_loop: bool = True,
is_triu_sym: bool = True,
**kwargs,
) -> sp.coo_matrix:
"""
Compute the adjacency matrix from a COO matrix.
Notes
-----
This function operates under the premise that the input matrix is symmetric
but keeps only the upper triangular part and the diagonal from the matrix for processing.
If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix.
If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix,
indicating no self-loop is encoded.
Parameters
----------
cm_coo : sp.coo_matrix
The sparse matrix in COO format.
allow_loop : bool, optional
Determines if a self-loop should be included in the resulting matrix.
Default is True.
is_triu_sym : bool, optional
Determines if the input matrix is symmetric and only the upper triangular part is used.
Default is True.
Returns
-------
adj_coo : sp.coo_matrix
Adjacency matrix where upper triangular elements are 1 and others are 2.
Examples
--------
Examples
--------
"""
#? Get the row IDs, column IDs, and data from the input COO matrix
row_ids = cm_coo.row
col_ids = cm_coo.col
out = _comp_single_graph_adj_mat_helper(
row_ids,
col_ids,
allow_loop=allow_loop,
is_triu_sym=is_triu_sym,
)
new_row_ids, new_col_ids, new_data = out
#? Create the adjacency matrix in COO format
adj_coo = sp.coo_matrix(
(new_data, (new_row_ids, new_col_ids)),
shape=cm_coo.shape
)
return adj_coo
@comp_single_graph_adj_mat.register(pd.DataFrame)
def _(
cm_df: pd.DataFrame,
allow_loop: bool = True,
is_triu_sym: bool = True,
row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS,
counts_colname: str = cm_consts.DataFrameSpecs.COUNTS,
) -> pd.DataFrame:
"""
Compute the adjacency matrix from a Pandas DataFrame.
Notes
-----
This function operates under the premise that the input matrix is symmetric
but keeps only the upper triangular part and the diagonal from the matrix for processing.
If `allow_loop` is True, the diagonal (self-loops) receives value 2 in the adjacency matrix.
If `allow_loop` is False, the diagonal positions are set to 0 in the adjacency matrix,
indicating no self-loop is encoded.
Parameters
----------
cm_df : pd.DataFrame
The input DataFrame.
allow_loop : bool, optional
Determines if a self-loop should be included in the resulting matrix.
Default is True.
is_triu_sym : bool, optional
Determines if the input matrix is symmetric and only the upper triangular part is used.
Default is True.
row_ids_colname : str, optional
The column name for row IDs in the input DataFrame.
Default is cm_consts.ROW_IDS_COLNAME.
col_ids_colname : str, optional
The column name for column IDs in the input DataFrame.
Default is cm_consts.COL_IDS_COLNAME.
counts_colname : str, optional
The column name for counts in the input DataFrame.
Default is cm_consts.COUNTS_COLNAME.
Returns
-------
adj_df : pd.DataFrame
Adjacency matrix where upper triangular elements are 1 and others are 2.
Examples
--------
Examples
--------
"""
row_ids = cm_df[row_ids_colname].to_numpy()
col_ids = cm_df[col_ids_colname].to_numpy()
out = _comp_single_graph_adj_mat_helper(
row_ids,
col_ids,
allow_loop=allow_loop,
is_triu_sym=is_triu_sym,
)
new_row_ids, new_col_ids, new_data = out
adj_df = pd.DataFrame({
row_ids_colname: new_row_ids,
col_ids_colname: new_col_ids,
counts_colname: new_data,
})
return adj_df