Source code for gunz_cm.preprocs.triu_matrix

# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"

import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts
from .commons import _create_diag_mask_helper

[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @functools.singledispatch def create_triu_matrix( data: t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame], min_k: t.Optional[int] = None, max_k: t.Optional[int] = None, remove_main_diag: bool = False, ) -> t.Union[np.ndarray, tuple, sp.coo_matrix, pd.DataFrame]: """ Creates a triangular matrix. Notes ----- This function creates a triangular matrix based on the input data. The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal. If `remove_main_diag` is True, the main diagonal elements are removed. Parameters ---------- data : t.Union[np.ndarray, sp.coo_matrix, pd.DataFrame] The input data to be converted to a triangular matrix. min_k : t.Optional[int], optional The minimum distance from the main diagonal (default is None). max_k : t.Optional[int], optional The maximum distance from the main diagonal (default is None). remove_main_diag : bool, optional Whether to remove the main diagonal elements (default is False). Returns ------- t.Union[np.ndarray, tuple, sp.coo_matrix, pd.DataFrame] The triangular matrix. Examples -------- Examples -------- """ raise PreprocError(f"No implementation for data type: {type(data).__name__}")
@create_triu_matrix.register(np.ndarray) def _( cm_mat: np.ndarray, min_k: t.Optional[int] = None, max_k: t.Optional[int] = None, remove_main_diag: bool = False, **kwargs, ) -> np.ndarray: """ Creates a triangular matrix from a numpy array. Notes ----- This function creates a triangular matrix from a numpy array. The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal. If `remove_main_diag` is True, the main diagonal elements are removed. Parameters ---------- cm_mat : np.ndarray The input numpy array to be converted to a triangular matrix. min_k : t.Optional[int], optional The minimum distance from the main diagonal (default is None). max_k : t.Optional[int], optional The maximum distance from the main diagonal (default is None). remove_main_diag : bool, optional Whether to remove the main diagonal elements (default is False). Returns ------- np.ndarray The triangular matrix. Examples -------- Examples -------- """ if cm_mat.ndim != 2: raise PreprocError("Input must be a 2D array.") if min_k is not None: if min_k < 0: raise PreprocError("min_k must be greater than or equal to 0.") if max_k is not None: if max_k < 0: raise PreprocError("max_k must be greater than or equal to 0.") if min_k is not None and min_k > max_k: raise PreprocError("min_k must be less than or equal to max_k.") min_k = 0 if min_k is None else min_k # Bolt Optimization: Use triu/tril directly on the matrix to avoid creating # intermediate boolean masks (NxN allocation) and improve performance. # This also fixes a bug where max_k resulted in an empty matrix. # Keep elements above/on min_k diagonal (sets lower triangle to 0) triu_cm_mat = np.triu(cm_mat, k=min_k) if max_k is not None: # Keep elements below/on max_k diagonal (sets upper triangle beyond max_k to 0) triu_cm_mat = np.tril(triu_cm_mat, k=max_k) if remove_main_diag: # Explicitly remove main diagonal if requested. np.fill_diagonal(triu_cm_mat, 0) return triu_cm_mat @create_triu_matrix.register(sp.coo_matrix) def _( cm_coo: sp.coo_matrix, min_k: t.Optional[int] = None, max_k: t.Optional[int] = None, remove_main_diag: bool = False, **kwargs, ) -> sp.coo_matrix: """ Creates a triangular matrix from a scipy sparse matrix. Notes ----- This function creates a triangular matrix from a scipy sparse matrix. The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal. If `remove_main_diag` is True, the main diagonal elements are removed. Parameters ---------- cm_coo : sp.coo_matrix The input scipy sparse matrix to be converted to a triangular matrix. min_k : t.Optional[int], optional The minimum distance from the main diagonal (default is None). max_k : t.Optional[int], optional The maximum distance from the main diagonal (default is None). remove_main_diag : bool, optional Whether to remove the main diagonal elements (default is False). Returns ------- sp.coo_matrix The triangular matrix. Examples -------- Examples -------- """ if min_k is not None: assert isinstance(min_k, int) and min_k >= 0 if remove_main_diag: min_k = np.amax([1, min_k]) else: pass else: if remove_main_diag: min_k = 1 else: min_k = 0 if max_k is not None: assert isinstance(max_k, int) # Bolt Optimization: Avoid deep copy of the entire matrix structure. # We can perform the masking on the indices directly and construct a new matrix. mask = _create_diag_mask_helper( cm_coo.row, cm_coo.col, abs_k=False, min_k=min_k, max_k=max_k, remove_main_diag=remove_main_diag, ) new_coo = sp.coo_matrix( (cm_coo.data[mask], (cm_coo.row[mask], cm_coo.col[mask])), shape=cm_coo.shape ) return new_coo @create_triu_matrix.register(pd.DataFrame) def _( cm_df: pd.DataFrame, min_k: t.Optional[int] = None, max_k: t.Optional[int] = None, remove_main_diag: bool = False, row_ids_colname: str= cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str= cm_consts.DataFrameSpecs.COL_IDS, **kwargs, ) -> pd.DataFrame: """ Creates a triangular matrix from a pandas DataFrame. Notes ----- This function creates a triangular matrix from a pandas DataFrame. The `min_k` and `max_k` parameters control the minimum and maximum distance from the main diagonal. If `remove_main_diag` is True, the main diagonal elements are removed. Parameters ---------- cm_df : pd.DataFrame The input pandas DataFrame to be converted to a triangular matrix. min_k : t.Optional[int], optional The minimum distance from the main diagonal (default is None). max_k : t.Optional[int], optional The maximum distance from the main diagonal (default is None). remove_main_diag : bool, optional Whether to remove the main diagonal elements (default is False). row_ids_colname : str, optional The column name for row IDs (default is cm_consts.ROW_IDS_COLNAME). col_ids_colname : str, optional The column name for column IDs (default is cm_consts.COL_IDS_COLNAME). Returns ------- pd.DataFrame The triangular matrix. Examples -------- Examples -------- """ if min_k is not None: assert isinstance(min_k, int) and min_k >= 0 if remove_main_diag: min_k = np.amax([1, min_k]) else: pass else: if remove_main_diag: min_k = 1 else: min_k = 0 if max_k is not None: assert isinstance(max_k, int) mask = _create_diag_mask_helper( cm_df[row_ids_colname], cm_df[col_ids_colname], abs_k=False, min_k=min_k, max_k=max_k, remove_main_diag=remove_main_diag, ) #? This is a copy operation for pandas 2.0 or newer out_cm_df = cm_df[mask] return out_cm_df