Source code for gunz_cm.preprocs.linear_scaler

"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import functools
import typing as t
from pydantic import validate_call, ConfigDict
from gunz_cm.exceptions import PreprocError
import numpy as np
from gunz_cm.utils.logger import logger
from scipy.sparse import coo_matrix, csr_matrix, issparse
from scipy.stats import norm
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy.interpolate import interp1d
from ..utils.matrix import _non_diagonal_mask


@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def _validate_inputs(
    matrix: t.Union[np.ndarray, coo_matrix, csr_matrix],
    inplace: bool,
    exclude_diag: bool = False
) -> None:
    """
    Validates input matrix for scaling/transformation operations.

    Notes
    -----
    This function checks if the matrix is empty, if it is square when diagonal exclusion is requested,
    and if the matrix is writeable and of the correct type for in-place operations.

    Parameters
    ----------
    matrix : Union[np.ndarray, coo_matrix, csr_matrix]
        The matrix to validate.
    inplace : bool
        Whether the operation is to be performed in-place.
    exclude_diag : bool, optional
        Whether to exclude the diagonal elements from the operation (default is False).

    Returns
    -------
    None

    Examples
    --------



Examples
--------
"""

    # Matrix emptiness check
    if (issparse(matrix) and matrix.nnz == 0) or (not issparse(matrix) and matrix.size == 0):
        raise PreprocError("Cannot process empty matrix")
    
    # Square matrix validation for diagonal operations
    if exclude_diag and matrix.shape[0] != matrix.shape[1]:
        raise PreprocError(f"Matrix must be square for diagonal exclusion. Got shape {matrix.shape}")
    
    # In-place compatibility checks
    if inplace:
        if issparse(matrix):
            if exclude_diag:
                raise PreprocError("Cannot modify sparse matrix in-place when excluding diagonal")
            if not matrix.data.flags.writeable:
                raise PreprocError("Sparse matrix data buffer is not writeable")
        else:
            if not matrix.flags.writeable:
                raise PreprocError("Dense matrix buffer is not writeable")
            if not np.issubdtype(matrix.dtype, np.floating):
                raise PreprocError("In-place operations require floating-point dtype")

[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @functools.singledispatch def scale_matrix( matrix: t.Union[np.ndarray, coo_matrix, csr_matrix], scaling_method: str = 'minmax', min_val: float = 0, max_val: float = 1, exclude_diagonal: bool = False, inplace: bool = False ) -> t.Union[np.ndarray, coo_matrix, csr_matrix]: """ Scales a matrix using the specified method. Notes ----- This function supports both dense and sparse matrices and can scale using either min-max scaling or normalization. It can also exclude diagonal elements from scaling and perform operations in-place if specified. Parameters ---------- matrix : Union[np.ndarray, coo_matrix, csr_matrix] The matrix to scale. scaling_method : str, optional The scaling method to use ('minmax' or 'normal', default is 'minmax'). min_val : float, optional The minimum value for min-max scaling (default is 0). max_val : float, optional The maximum value for min-max scaling (default is 1). exclude_diagonal : bool, optional Whether to exclude diagonal elements from scaling (default is False). inplace : bool, optional Whether to perform the scaling in-place (default is False). Returns ------- Union[np.ndarray, coo_matrix, csr_matrix] The scaled matrix. Examples -------- Examples -------- """ raise PreprocError(f"No implementation for data type: {type(matrix).__name__}")
@scale_matrix.register(np.ndarray) def _( matrix: np.ndarray, scaling_method: str, min_val: float, max_val: float, exclude_diagonal: bool = False, inplace: bool = False, **kwargs, ) -> np.ndarray: """ Scales a dense numpy array using the specified method. Notes ----- This function supports min-max scaling and normalization. It can exclude diagonal elements from scaling and perform operations in-place if specified. Parameters ---------- matrix : np.ndarray The dense numpy array to scale. scaling_method : str The scaling method to use ('minmax' or 'normal'). min_val : float The minimum value for min-max scaling. max_val : float The maximum value for min-max scaling. exclude_diagonal : bool, optional Whether to exclude diagonal elements from scaling (default is False). inplace : bool, optional Whether to perform the scaling in-place (default is False). Returns ------- np.ndarray The scaled numpy array. Examples -------- Examples -------- """ if matrix.size == 0: raise PreprocError("Matrix is empty. Cannot scale.") if exclude_diagonal: if matrix.shape[0] != matrix.shape[1]: logger.warning("Matrix is not square; exclude_diagonal may have no effect.") exclude_diagonal = False else: mask = np.ones(matrix.shape, dtype=bool) np.fill_diagonal(mask, False) data = matrix[mask] if data.size == 0: raise PreprocError("All elements are diagonal after exclusion. Cannot scale.") if scaling_method == 'minmax': scaler = MinMaxScaler(feature_range=(min_val, max_val)) scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten() elif scaling_method == 'normal': scaler = StandardScaler() scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten() if inplace: scaled_matrix = matrix else: scaled_matrix = matrix.copy() scaled_matrix[mask] = scaled_data return scaled_matrix if scaling_method == 'minmax': scaler = MinMaxScaler(feature_range=(min_val, max_val)) matrix_scaled = scaler.fit_transform(matrix) else: scaler = StandardScaler() matrix_scaled = scaler.fit_transform(matrix) if inplace: matrix[...] = matrix_scaled return matrix else: return matrix_scaled @scale_matrix.register(coo_matrix) @scale_matrix.register(csr_matrix) def _( matrix: t.Union[coo_matrix, csr_matrix], scaling_method: str, min_val: float, max_val: float, exclude_diagonal: bool = False, inplace: bool = False, **kwargs, ) -> t.Union[coo_matrix, csr_matrix]: """ Scales a sparse matrix using the specified method. Notes ----- This function supports min-max scaling and normalization for both COO and CSR sparse matrices. It can exclude diagonal elements from scaling and perform operations in-place if specified. Parameters ---------- matrix : Union[coo_matrix, csr_matrix] The sparse matrix to scale. scaling_method : str The scaling method to use ('minmax' or 'normal'). min_val : float The minimum value for min-max scaling. max_val : float The maximum value for min-max scaling. exclude_diagonal : bool, optional Whether to exclude diagonal elements from scaling (default is False). inplace : bool, optional Whether to perform the scaling in-place (default is False). Returns ------- Union[coo_matrix, csr_matrix] The scaled sparse matrix. Examples -------- Examples -------- """ _validate_inputs(matrix, inplace, exclude_diagonal) if exclude_diagonal: mask = _non_diagonal_mask(matrix) non_diag_count = mask.sum() if non_diag_count == 0: raise PreprocError("All elements are diagonal after exclusion. Cannot scale.") data_non_diag = matrix.data[mask] if scaling_method == 'minmax': scaler = MinMaxScaler(feature_range=(min_val, max_val)) scaled_non_diag = scaler.fit_transform(data_non_diag.reshape(-1, 1)).flatten() else: scaler = StandardScaler() scaled_non_diag = scaler.fit_transform(data_non_diag.reshape(-1, 1)).flatten() if inplace: matrix.data[mask] = scaled_non_diag return matrix else: new_data = matrix.data.copy() new_data[mask] = scaled_non_diag if isinstance(matrix, coo_matrix): return coo_matrix((new_data, (matrix.row, matrix.col)), shape=matrix.shape, copy=False) else: return csr_matrix((new_data, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False) else: data = matrix.data.copy() if scaling_method == 'minmax': scaler = MinMaxScaler(feature_range=(min_val, max_val)) data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten() else: scaler = StandardScaler() data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten() if inplace: matrix.data = data_scaled return matrix else: if isinstance(matrix, coo_matrix): return coo_matrix((data_scaled, (matrix.row, matrix.col)), shape=matrix.shape, copy=False) else: return csr_matrix((data_scaled, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False)
[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @functools.singledispatch def transform_to_gaussian( matrix: t.Union[np.ndarray, coo_matrix, csr_matrix], mu: float = 0, sigma: float = 1, inplace: bool = False ) -> t.Union[np.ndarray, coo_matrix, csr_matrix]: """ Transforms a matrix to a Gaussian distribution. Notes ----- This function transforms the matrix to have a Gaussian distribution with the specified mean and standard deviation. It supports both dense and sparse matrices and can perform operations in-place if specified. Parameters ---------- matrix : Union[np.ndarray, coo_matrix, csr_matrix] The matrix to transform. mu : float, optional The mean of the Gaussian distribution (default is 0). sigma : float, optional The standard deviation of the Gaussian distribution (default is 1). inplace : bool, optional Whether to perform the transformation in-place (default is False). Returns ------- Union[np.ndarray, coo_matrix, csr_matrix] The transformed matrix. Examples -------- Examples -------- """ raise PreprocError(f"No implementation for data type: {type(matrix).__name__}")
@transform_to_gaussian.register(np.ndarray) def _( matrix: np.ndarray, mu: float, sigma: float, inplace: bool = False, **kwargs, ) -> np.ndarray: """ Transforms a dense numpy array to a Gaussian distribution. Notes ----- This function transforms the dense numpy array to have a Gaussian distribution with the specified mean and standard deviation. It can perform operations in-place if specified. Parameters ---------- matrix : np.ndarray The dense numpy array to transform. mu : float The mean of the Gaussian distribution. sigma : float The standard deviation of the Gaussian distribution. inplace : bool, optional Whether to perform the transformation in-place (default is False). Returns ------- np.ndarray The transformed numpy array. Examples -------- Examples -------- """ flat = matrix.flatten() sorted_flat = np.sort(flat) unique_values, first_indices = np.unique(sorted_flat, return_index=True) counts = np.diff(first_indices, append=len(sorted_flat)) cum_counts = np.cumsum(counts) cdf = cum_counts / len(sorted_flat) cdf_interp = interp1d(unique_values, cdf, fill_value='extrapolate') prob = cdf_interp(matrix) # Add a small epsilon to avoid inf values prob = np.clip(prob, 1e-10, 1 - 1e-10) transformed = norm.ppf(prob, loc=mu, scale=sigma) if inplace: matrix[...] = transformed return matrix else: return transformed @transform_to_gaussian.register(coo_matrix) @transform_to_gaussian.register(csr_matrix) def _( matrix: t.Union[coo_matrix, csr_matrix], mu: float, sigma: float, inplace: bool = False, **kwargs, ) -> t.Union[coo_matrix, csr_matrix]: """ Transforms a sparse matrix to a Gaussian distribution. Notes ----- This function transforms the sparse matrix to have a Gaussian distribution with the specified mean and standard deviation. It supports both COO and CSR sparse matrices and can perform operations in-place if specified. Parameters ---------- matrix : Union[coo_matrix, csr_matrix] The sparse matrix to transform. mu : float The mean of the Gaussian distribution. sigma : float The standard deviation of the Gaussian distribution. inplace : bool, optional Whether to perform the transformation in-place (default is False). Returns ------- Union[coo_matrix, csr_matrix] The transformed sparse matrix. Examples -------- Examples -------- """ data = matrix.data.copy() sorted_data = np.sort(data) unique_values, first_indices = np.unique(sorted_data, return_index=True) counts = np.diff(first_indices, append=len(sorted_data)) cum_counts = np.cumsum(counts) cdf = cum_counts / len(sorted_data) cdf_interp = interp1d(unique_values, cdf, fill_value='extrapolate') prob = cdf_interp(data) # Add a small epsilon to avoid inf values prob = np.clip(prob, 1e-10, 1 - 1e-10) data_transformed = norm.ppf(prob, loc=mu, scale=sigma) if inplace: matrix.data = data_transformed return matrix else: if isinstance(matrix, coo_matrix): return coo_matrix((data_transformed, (matrix.row, matrix.col)), shape=matrix.shape, copy=False) else: return csr_matrix((data_transformed, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False)