# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"
import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts
from .commons import _create_diag_mask_helper
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
@functools.singledispatch
def create_triu_matrix(
data: t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame],
min_k: t.Optional[int] = None,
max_k: t.Optional[int] = None,
remove_main_diag: bool = False,
) -> t.Union[np.ndarray, tuple, sp.coo_matrix, pd.DataFrame]:
"""
Creates a triangular matrix.
Notes
-----
This function creates a triangular matrix based on the input data.
The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal.
If `remove_main_diag` is True, the main diagonal elements are removed.
Parameters
----------
data : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame]
The input data to be converted to a triangular matrix.
min_k : t.Optional[int], optional
The minimum distance from the main diagonal (default is None).
max_k : t.Optional[int], optional
The maximum distance from the main diagonal (default is None).
remove_main_diag : bool, optional
Whether to remove the main diagonal elements (default is False).
Returns
-------
t.Union[np.ndarray, tuple, sp.coo_matrix, pd.DataFrame]
The triangular matrix.
Examples
--------
Examples
--------
"""
raise PreprocError(f"No implementation for data type: {type(data).__name__}")
@create_triu_matrix.register(np.ndarray)
def _(
cm_mat: np.ndarray,
min_k: t.Optional[int] = None,
max_k: t.Optional[int] = None,
remove_main_diag: bool = False,
**kwargs,
) -> np.ndarray:
"""
Creates a triangular matrix from a numpy array.
Notes
-----
This function creates a triangular matrix from a numpy array.
The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal.
If `remove_main_diag` is True, the main diagonal elements are removed.
Parameters
----------
cm_mat : np.ndarray
The input numpy array to be converted to a triangular matrix.
min_k : t.Optional[int], optional
The minimum distance from the main diagonal (default is None).
max_k : t.Optional[int], optional
The maximum distance from the main diagonal (default is None).
remove_main_diag : bool, optional
Whether to remove the main diagonal elements (default is False).
Returns
-------
np.ndarray
The triangular matrix.
Examples
--------
Examples
--------
"""
if cm_mat.ndim != 2:
raise PreprocError("Input must be a 2D array.")
if min_k is not None:
if min_k < 0:
raise PreprocError("min_k must be greater than or equal to 0.")
if max_k is not None:
if max_k < 0:
raise PreprocError("max_k must be greater than or equal to 0.")
if min_k is not None and min_k > max_k:
raise PreprocError("min_k must be less than or equal to max_k.")
min_k = 0 if min_k is None else min_k
# Bolt Optimization: Use triu/tril directly on the matrix to avoid creating
# intermediate boolean masks (NxN allocation) and improve performance.
# This also fixes a bug where max_k resulted in an empty matrix.
# Keep elements above/on min_k diagonal (sets lower triangle to 0)
triu_cm_mat = np.triu(cm_mat, k=min_k)
if max_k is not None:
# Keep elements below/on max_k diagonal (sets upper triangle beyond max_k to 0)
triu_cm_mat = np.tril(triu_cm_mat, k=max_k)
if remove_main_diag:
# Explicitly remove main diagonal if requested.
np.fill_diagonal(triu_cm_mat, 0)
return triu_cm_mat
@create_triu_matrix.register(sp.coo_matrix)
def _(
cm_coo: sp.coo_matrix,
min_k: t.Optional[int] = None,
max_k: t.Optional[int] = None,
remove_main_diag: bool = False,
**kwargs,
) -> sp.coo_matrix:
"""
Creates a triangular matrix from a scipy sparse matrix.
Notes
-----
This function creates a triangular matrix from a scipy sparse matrix.
The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal.
If `remove_main_diag` is True, the main diagonal elements are removed.
Parameters
----------
cm_coo : sp.coo_matrix
The input scipy sparse matrix to be converted to a triangular matrix.
min_k : t.Optional[int], optional
The minimum distance from the main diagonal (default is None).
max_k : t.Optional[int], optional
The maximum distance from the main diagonal (default is None).
remove_main_diag : bool, optional
Whether to remove the main diagonal elements (default is False).
Returns
-------
sp.coo_matrix
The triangular matrix.
Examples
--------
Examples
--------
"""
if min_k is not None:
assert isinstance(min_k, int) and min_k >= 0
if remove_main_diag:
min_k = np.amax([1, min_k])
else:
pass
else:
if remove_main_diag:
min_k = 1
else:
min_k = 0
if max_k is not None:
assert isinstance(max_k, int)
# Bolt Optimization: Avoid deep copy of the entire matrix structure.
# We can perform the masking on the indices directly and construct a new matrix.
mask = _create_diag_mask_helper(
cm_coo.row,
cm_coo.col,
abs_k=False,
min_k=min_k,
max_k=max_k,
remove_main_diag=remove_main_diag,
)
new_coo = sp.coo_matrix(
(cm_coo.data[mask], (cm_coo.row[mask], cm_coo.col[mask])),
shape=cm_coo.shape
)
return new_coo
@create_triu_matrix.register(pd.DataFrame)
def _(
cm_df: pd.DataFrame,
min_k: t.Optional[int] = None,
max_k: t.Optional[int] = None,
remove_main_diag: bool = False,
row_ids_colname: str= cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str= cm_consts.DataFrameSpecs.COL_IDS,
**kwargs,
) -> pd.DataFrame:
"""
Creates a triangular matrix from a pandas DataFrame.
Notes
-----
This function creates a triangular matrix from a pandas DataFrame.
The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal.
If `remove_main_diag` is True, the main diagonal elements are removed.
Parameters
----------
cm_df : pd.DataFrame
The input pandas DataFrame to be converted to a triangular matrix.
min_k : t.Optional[int], optional
The minimum distance from the main diagonal (default is None).
max_k : t.Optional[int], optional
The maximum distance from the main diagonal (default is None).
remove_main_diag : bool, optional
Whether to remove the main diagonal elements (default is False).
row_ids_colname : str, optional
The column name for row IDs (default is cm_consts.ROW_IDS_COLNAME).
col_ids_colname : str, optional
The column name for column IDs (default is cm_consts.COL_IDS_COLNAME).
Returns
-------
pd.DataFrame
The triangular matrix.
Examples
--------
Examples
--------
"""
if min_k is not None:
assert isinstance(min_k, int) and min_k >= 0
if remove_main_diag:
min_k = np.amax([1, min_k])
else:
pass
else:
if remove_main_diag:
min_k = 1
else:
min_k = 0
if max_k is not None:
assert isinstance(max_k, int)
mask = _create_diag_mask_helper(
cm_df[row_ids_colname],
cm_df[col_ids_colname],
abs_k=False,
min_k=min_k,
max_k=max_k,
remove_main_diag=remove_main_diag,
)
#? This is a copy operation for pandas 2.0 or newer
out_cm_df = cm_df[mask]
return out_cm_df