"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import functools
import typing as t
from pydantic import validate_call, ConfigDict
from gunz_cm.exceptions import PreprocError
import numpy as np
from gunz_cm.utils.logger import logger
from scipy.sparse import coo_matrix, csr_matrix, issparse
from scipy.stats import norm
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from scipy.interpolate import interp1d
from ..utils.matrix import _non_diagonal_mask
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def _validate_inputs(
matrix: t.Union[np.ndarray, coo_matrix, csr_matrix],
inplace: bool,
exclude_diag: bool = False
) -> None:
"""
Validates input matrix for scaling/transformation operations.
Notes
-----
This function checks if the matrix is empty, if it is square when diagonal exclusion is requested,
and if the matrix is writeable and of the correct type for in-place operations.
Parameters
----------
matrix : Union[np.ndarray, coo_matrix, csr_matrix]
The matrix to validate.
inplace : bool
Whether the operation is to be performed in-place.
exclude_diag : bool, optional
Whether to exclude the diagonal elements from the operation (default is False).
Returns
-------
None
Examples
--------
Examples
--------
"""
# Matrix emptiness check
if (issparse(matrix) and matrix.nnz == 0) or (not issparse(matrix) and matrix.size == 0):
raise PreprocError("Cannot process empty matrix")
# Square matrix validation for diagonal operations
if exclude_diag and matrix.shape[0] != matrix.shape[1]:
raise PreprocError(f"Matrix must be square for diagonal exclusion. Got shape {matrix.shape}")
# In-place compatibility checks
if inplace:
if issparse(matrix):
if exclude_diag:
raise PreprocError("Cannot modify sparse matrix in-place when excluding diagonal")
if not matrix.data.flags.writeable:
raise PreprocError("Sparse matrix data buffer is not writeable")
else:
if not matrix.flags.writeable:
raise PreprocError("Dense matrix buffer is not writeable")
if not np.issubdtype(matrix.dtype, np.floating):
raise PreprocError("In-place operations require floating-point dtype")
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
@functools.singledispatch
def scale_matrix(
matrix: t.Union[np.ndarray, coo_matrix, csr_matrix],
scaling_method: str = 'minmax',
min_val: float = 0,
max_val: float = 1,
exclude_diagonal: bool = False,
inplace: bool = False
) -> t.Union[np.ndarray, coo_matrix, csr_matrix]:
"""
Scales a matrix using the specified method.
Notes
-----
This function supports both dense and sparse matrices and can scale using either min-max scaling or normalization.
It can also exclude diagonal elements from scaling and perform operations in-place if specified.
Parameters
----------
matrix : Union[np.ndarray, coo_matrix, csr_matrix]
The matrix to scale.
scaling_method : str, optional
The scaling method to use ('minmax' or 'normal', default is 'minmax').
min_val : float, optional
The minimum value for min-max scaling (default is 0).
max_val : float, optional
The maximum value for min-max scaling (default is 1).
exclude_diagonal : bool, optional
Whether to exclude diagonal elements from scaling (default is False).
inplace : bool, optional
Whether to perform the scaling in-place (default is False).
Returns
-------
Union[np.ndarray, coo_matrix, csr_matrix]
The scaled matrix.
Examples
--------
Examples
--------
"""
raise PreprocError(f"No implementation for data type: {type(matrix).__name__}")
@scale_matrix.register(np.ndarray)
def _(
matrix: np.ndarray,
scaling_method: str,
min_val: float,
max_val: float,
exclude_diagonal: bool = False,
inplace: bool = False,
**kwargs,
) -> np.ndarray:
"""
Scales a dense numpy array using the specified method.
Notes
-----
This function supports min-max scaling and normalization. It can exclude diagonal elements from scaling
and perform operations in-place if specified.
Parameters
----------
matrix : np.ndarray
The dense numpy array to scale.
scaling_method : str
The scaling method to use ('minmax' or 'normal').
min_val : float
The minimum value for min-max scaling.
max_val : float
The maximum value for min-max scaling.
exclude_diagonal : bool, optional
Whether to exclude diagonal elements from scaling (default is False).
inplace : bool, optional
Whether to perform the scaling in-place (default is False).
Returns
-------
np.ndarray
The scaled numpy array.
Examples
--------
Examples
--------
"""
if matrix.size == 0:
raise PreprocError("Matrix is empty. Cannot scale.")
if exclude_diagonal:
if matrix.shape[0] != matrix.shape[1]:
logger.warning("Matrix is not square; exclude_diagonal may have no effect.")
exclude_diagonal = False
else:
mask = np.ones(matrix.shape, dtype=bool)
np.fill_diagonal(mask, False)
data = matrix[mask]
if data.size == 0:
raise PreprocError("All elements are diagonal after exclusion. Cannot scale.")
if scaling_method == 'minmax':
scaler = MinMaxScaler(feature_range=(min_val, max_val))
scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
elif scaling_method == 'normal':
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data.reshape(-1, 1)).flatten()
if inplace:
scaled_matrix = matrix
else:
scaled_matrix = matrix.copy()
scaled_matrix[mask] = scaled_data
return scaled_matrix
if scaling_method == 'minmax':
scaler = MinMaxScaler(feature_range=(min_val, max_val))
matrix_scaled = scaler.fit_transform(matrix)
else:
scaler = StandardScaler()
matrix_scaled = scaler.fit_transform(matrix)
if inplace:
matrix[...] = matrix_scaled
return matrix
else:
return matrix_scaled
@scale_matrix.register(coo_matrix)
@scale_matrix.register(csr_matrix)
def _(
matrix: t.Union[coo_matrix, csr_matrix],
scaling_method: str,
min_val: float,
max_val: float,
exclude_diagonal: bool = False,
inplace: bool = False,
**kwargs,
) -> t.Union[coo_matrix, csr_matrix]:
"""
Scales a sparse matrix using the specified method.
Notes
-----
This function supports min-max scaling and normalization for both COO and CSR sparse matrices.
It can exclude diagonal elements from scaling and perform operations in-place if specified.
Parameters
----------
matrix : Union[coo_matrix, csr_matrix]
The sparse matrix to scale.
scaling_method : str
The scaling method to use ('minmax' or 'normal').
min_val : float
The minimum value for min-max scaling.
max_val : float
The maximum value for min-max scaling.
exclude_diagonal : bool, optional
Whether to exclude diagonal elements from scaling (default is False).
inplace : bool, optional
Whether to perform the scaling in-place (default is False).
Returns
-------
Union[coo_matrix, csr_matrix]
The scaled sparse matrix.
Examples
--------
Examples
--------
"""
_validate_inputs(matrix, inplace, exclude_diagonal)
if exclude_diagonal:
mask = _non_diagonal_mask(matrix)
non_diag_count = mask.sum()
if non_diag_count == 0:
raise PreprocError("All elements are diagonal after exclusion. Cannot scale.")
data_non_diag = matrix.data[mask]
if scaling_method == 'minmax':
scaler = MinMaxScaler(feature_range=(min_val, max_val))
scaled_non_diag = scaler.fit_transform(data_non_diag.reshape(-1, 1)).flatten()
else:
scaler = StandardScaler()
scaled_non_diag = scaler.fit_transform(data_non_diag.reshape(-1, 1)).flatten()
if inplace:
matrix.data[mask] = scaled_non_diag
return matrix
else:
new_data = matrix.data.copy()
new_data[mask] = scaled_non_diag
if isinstance(matrix, coo_matrix):
return coo_matrix((new_data, (matrix.row, matrix.col)), shape=matrix.shape, copy=False)
else:
return csr_matrix((new_data, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False)
else:
data = matrix.data.copy()
if scaling_method == 'minmax':
scaler = MinMaxScaler(feature_range=(min_val, max_val))
data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten()
else:
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data.reshape(-1, 1)).flatten()
if inplace:
matrix.data = data_scaled
return matrix
else:
if isinstance(matrix, coo_matrix):
return coo_matrix((data_scaled, (matrix.row, matrix.col)), shape=matrix.shape, copy=False)
else:
return csr_matrix((data_scaled, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False)
@transform_to_gaussian.register(np.ndarray)
def _(
matrix: np.ndarray,
mu: float,
sigma: float,
inplace: bool = False,
**kwargs,
) -> np.ndarray:
"""
Transforms a dense numpy array to a Gaussian distribution.
Notes
-----
This function transforms the dense numpy array to have a Gaussian distribution with the specified mean and standard deviation.
It can perform operations in-place if specified.
Parameters
----------
matrix : np.ndarray
The dense numpy array to transform.
mu : float
The mean of the Gaussian distribution.
sigma : float
The standard deviation of the Gaussian distribution.
inplace : bool, optional
Whether to perform the transformation in-place (default is False).
Returns
-------
np.ndarray
The transformed numpy array.
Examples
--------
Examples
--------
"""
flat = matrix.flatten()
sorted_flat = np.sort(flat)
unique_values, first_indices = np.unique(sorted_flat, return_index=True)
counts = np.diff(first_indices, append=len(sorted_flat))
cum_counts = np.cumsum(counts)
cdf = cum_counts / len(sorted_flat)
cdf_interp = interp1d(unique_values, cdf, fill_value='extrapolate')
prob = cdf_interp(matrix)
# Add a small epsilon to avoid inf values
prob = np.clip(prob, 1e-10, 1 - 1e-10)
transformed = norm.ppf(prob, loc=mu, scale=sigma)
if inplace:
matrix[...] = transformed
return matrix
else:
return transformed
@transform_to_gaussian.register(coo_matrix)
@transform_to_gaussian.register(csr_matrix)
def _(
matrix: t.Union[coo_matrix, csr_matrix],
mu: float,
sigma: float,
inplace: bool = False,
**kwargs,
) -> t.Union[coo_matrix, csr_matrix]:
"""
Transforms a sparse matrix to a Gaussian distribution.
Notes
-----
This function transforms the sparse matrix to have a Gaussian distribution with the specified mean and standard deviation.
It supports both COO and CSR sparse matrices and can perform operations in-place if specified.
Parameters
----------
matrix : Union[coo_matrix, csr_matrix]
The sparse matrix to transform.
mu : float
The mean of the Gaussian distribution.
sigma : float
The standard deviation of the Gaussian distribution.
inplace : bool, optional
Whether to perform the transformation in-place (default is False).
Returns
-------
Union[coo_matrix, csr_matrix]
The transformed sparse matrix.
Examples
--------
Examples
--------
"""
data = matrix.data.copy()
sorted_data = np.sort(data)
unique_values, first_indices = np.unique(sorted_data, return_index=True)
counts = np.diff(first_indices, append=len(sorted_data))
cum_counts = np.cumsum(counts)
cdf = cum_counts / len(sorted_data)
cdf_interp = interp1d(unique_values, cdf, fill_value='extrapolate')
prob = cdf_interp(data)
# Add a small epsilon to avoid inf values
prob = np.clip(prob, 1e-10, 1 - 1e-10)
data_transformed = norm.ppf(prob, loc=mu, scale=sigma)
if inplace:
matrix.data = data_transformed
return matrix
else:
if isinstance(matrix, coo_matrix):
return coo_matrix((data_transformed, (matrix.row, matrix.col)), shape=matrix.shape, copy=False)
else:
return csr_matrix((data_transformed, matrix.indices, matrix.indptr), shape=matrix.shape, copy=False)