# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"
import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts
from .infer_shape import (
infer_mat_shape
)
AVAIL_OP = {
"union": np.union1d,
"intersect": np.intersect1d,
}
[docs]
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
@functools.singledispatch
def filter_common_empty_rowcols(
data1,
data2,
op:str="union",
is_triu_sym:bool=True,
axis:t.Optional[int]=None,
ret_mapping:bool=False,
ret_unique_ids:bool=False,
):
"""
Filter out unalignable regions from the input data.
Parameters
----------
data : pandas.DataFrame or scipy.sparse matrix
The input data.
is_triu_sym : bool, optional
If the input is symmetric but only the upper triangle part of the matrix is given.
Defaults to True.
axis : int, optional
The axis to filter on.
Defaults to None.
ret_mapping : bool, optional
Whether to return the mapping of the original ids to the new ids.
Defaults to False.
Returns
-------
filtered_data : pandas.DataFrame or scipy.sparse matrix
The filtered data.
Examples
--------
"""
raise PreprocError(f"No implementation for data type: {type(data1).__name__}")
@filter_common_empty_rowcols.register(np.ndarray)
def _(
cm_mat1: np.ndarray,
cm_mat2: np.ndarray,
is_triu_sym:bool=True,
axis:int=None,
ret_mapping:bool=False,
**kwargs,
) -> np.ndarray:
"""
Filter out unalignable regions from the input matrix.
Parameters
----------
data : ndarray
The input matrix (dense matrix).
is_triu_sym : bool, optional
If the input is symmetric but only the upper triangle part of the matrix is given.
Defaults to True.
ret_mapping : bool, optional
Whether to return the mapping of the original ids to the new ids.
Defaults to False.
ret_mask : bool, optional
Whether to return the mask of the original ids. Defaults to False.
Returns
-------
filtered_data : ndarray
The filtered matrix.
Examples
--------
"""
raise NotImplementedError("filter_common_empty_rowcols_mat is not yet implemented!")
@filter_common_empty_rowcols.register(tuple)
def _(
data1: t.Tuple[np.ndarray, np.ndarray],
data2: t.Tuple[np.ndarray, np.ndarray],
op: str = "union",
is_triu_sym: bool = True,
axis: t.Optional[int] = None,
ret_mapping: bool = False,
ret_unique_ids: bool = False,
**kwargs,
) -> t.Tuple[np.ndarray, np.ndarray, t.Optional[np.ndarray], t.Optional[np.ndarray]]:
"""
Filter common empty row and column IDs from two sets of row and column IDs.
Notes
-----
This function filters common empty row and column IDs from two sets of row
and column IDs. It can perform union or intersection operations on the IDs
and handle triangular and symmetric matrices. The function can also return
the mapping arrays and unique IDs.
Parameters
----------
data1 : Tuple[np.ndarray, np.ndarray]
Tuple of row and column IDs from the first set.
data2 : Tuple[np.ndarray, np.ndarray]
Tuple of row and column IDs from the second set.
op : str, optional
Operation to perform on the IDs.
One of "union" or "intersect".
Default is "union".
is_triu_sym : bool, optional
Whether the input matrices are triangular and symmetric.
Default is True.
axis : int, optional
Axis to operate on. If None, operate on both axes.
Default is None.
ret_mapping : bool, optional
Whether to return the mapping arrays.
Default is False.
ret_unique_ids : bool, optional
Whether to return the unique IDs array.
Default is False.
Returns
-------
new_row_ids1 : np.ndarray
Filtered row IDs from the first set.
new_col_ids1 : np.ndarray
Filtered column IDs from the first set.
new_row_ids2 : np.ndarray
Filtered row IDs from the second set.
new_col_ids2 : np.ndarray
Filtered column IDs from the second set.
extras : list[np.ndarray], optional
Additional returned arrays, such as mapping arrays, if ret_mapping is True.
Examples
--------
Examples
--------
"""
if op not in AVAIL_OP:
raise PreprocError(f"Invalid operation: {op}")
row_ids1, col_ids1 = data1
row_ids2, col_ids2 = data2
ret = []
if is_triu_sym is True and axis is None:
unique_ids1 = np.union1d(row_ids1, col_ids1)
unique_ids2 = np.union1d(row_ids2, col_ids2)
op_f = AVAIL_OP[op]
unique_ids = op_f(unique_ids1, unique_ids2)
mapping = {uid: i for i, uid in enumerate(unique_ids)}
new_row_ids1 = np.array([mapping[i] for i in row_ids1 if i in mapping])
new_col_ids1 = np.array([mapping[i] for i in col_ids1 if i in mapping])
new_row_ids2 = np.array([mapping[i] for i in row_ids2 if i in mapping])
new_col_ids2 = np.array([mapping[i] for i in col_ids2 if i in mapping])
ret.extend([new_row_ids1, new_col_ids1])
ret.extend([new_row_ids2, new_col_ids2])
if ret_mapping:
ret.append(mapping)
if ret_unique_ids:
ret.append(unique_ids)
#? If non-symmetric or handle only one axis
else:
if axis is None or axis == 1:
unique_row_ids1 = np.unique(row_ids1)
unique_row_ids2 = np.unique(row_ids2)
op_f = AVAIL_OP[op]
unique_row_ids = op_f(unique_row_ids1, unique_row_ids2)
row_mapping = {uid: i for i, uid in enumerate(unique_row_ids)}
new_row_ids1 = np.array([row_mapping[i] for i in row_ids1 if i in row_mapping])
new_row_ids2 = np.array([row_mapping[i] for i in row_ids2 if i in row_mapping])
else:
new_row_ids1 = row_ids1
new_row_ids2 = row_ids2
if axis is None or axis == 0:
unique_col_ids1 = np.unique(col_ids1)
unique_col_ids2 = np.unique(col_ids2)
op_f = AVAIL_OP[op]
unique_col_ids = op_f(unique_col_ids1, unique_col_ids2)
col_mapping = {uid: i for i, uid in enumerate(unique_col_ids)}
new_col_ids1 = np.array([col_mapping[i] for i in col_ids1 if i in col_mapping])
new_col_ids2 = np.array([col_mapping[i] for i in col_ids2 if i in col_mapping])
else:
new_col_ids1 = col_ids1
new_col_ids2 = col_ids2
ret.extend([new_row_ids1, new_col_ids1])
ret.extend([new_row_ids2, new_col_ids2])
if ret_mapping:
if axis is None or axis == 1:
ret.append(row_mapping)
ret.append(unique_row_ids)
if axis is None or axis == 0:
ret.append(col_mapping)
ret.append(unique_col_ids)
return ret
@filter_common_empty_rowcols.register(sp.coo_matrix)
def _(
cm_coo1: sp.coo_matrix,
cm_coo2: sp.coo_matrix,
op: str = "union",
is_triu_sym: bool = True,
axis: t.Optional[int] = None,
ret_mapping: bool = False,
ret_unique_ids: bool = False,
**kwargs,
) -> t.Tuple[sp.spmatrix, t.Optional[t.Tuple[np.ndarray,...]]]:
"""
Filter common empty row and column IDs from two COO matrices.
Parameters
----------
coo1 : sparse.coo_matrix
First COO matrix.
coo2 : sparse.coo_matrix
Second COO matrix.
op : str, optional
Operation to perform on the IDs. One of "union" or "intersect".
Default is "union".
is_triu_sym : bool, optional
Whether the input matrices are upper triangular and symmetric.
Default is True.
axis : int, optional
Axis to operate on. If None, operate on both axes.
Default is None.
ret_mapping : bool, optional
Whether to return the mapping arrays.
Default is False.
ret_unique_ids : bool, optional
Whether to return the unique IDs array.
Default is False.
Returns
-------
new_coo1 : sparse.spmatrix
Filtered COO matrix from the first matrix.
new_coo2 : sparse.spmatrix
Filtered COO matrix from the second matrix.
extras : tuple[np.ndarray,...], optional
Mapping arrays, if ret_mapping is True.
Examples
--------
"""
if not sp.issparse(cm_coo1):
raise PreprocError("Matrix 1 must be sparse.")
#? Preprocess: detect the type of sparse matrix and convert to COO if necessary
if not isinstance(cm_coo1, sp.coo_matrix):
raise PreprocError("Matrix 1 must be a COO matrix.")
if not isinstance(cm_coo2, sp.coo_matrix):
raise PreprocError("Matrix 2 must be a COO matrix.")
row_ids1, col_ids1 = cm_coo1.row, cm_coo1.col
row_ids2, col_ids2 = cm_coo2.row, cm_coo2.col
out = filter_common_empty_rowcols(
(row_ids1, col_ids1),
(row_ids2, col_ids2),
op=op,
is_triu_sym=is_triu_sym,
axis=axis,
ret_mapping=ret_mapping,
ret_unique_ids=ret_unique_ids,
)
new_row_ids1, new_col_ids1 = out[0], out[1]
new_row_ids2, new_col_ids2 = out[2], out[3]
extras = out[4:]
#TODO: Fix/unify the if-case
if is_triu_sym is True and axis is None:
[new_nrows1, new_ncols1] = infer_mat_shape(
(row_ids1, col_ids1),
is_triu_sym=is_triu_sym,
)
[new_nrows2, new_ncols2] = infer_mat_shape(
(row_ids2, col_ids2),
is_triu_sym=is_triu_sym,
)
new_nrows = np.amax([new_nrows1, new_nrows2])
new_ncols = np.amax([new_ncols1, new_ncols2])
new_shape = [new_nrows, new_ncols]
else:
[new_nrows, new_ncols] = infer_mat_shape(
(new_row_ids1, new_col_ids1),
is_triu_sym=False,
)
if axis is None or axis == 1:
pass
else:
new_nrows = cm_coo1.shape[0]
if axis is None or axis == 0 :
pass
else:
new_ncols = cm_coo1.shape[1]
new_shape = [new_nrows, new_ncols]
new_coo1 = sp.coo_matrix(
(cm_coo1.data, (new_row_ids1, new_col_ids1)),
shape=new_shape
)
new_coo2 = sp.coo_matrix(
(cm_coo2.data, (new_row_ids2, new_col_ids2)),
shape=new_shape
)
if len(extras):
return new_coo1, new_coo2, *extras
else:
return new_coo1, new_coo2
@filter_common_empty_rowcols.register(pd.DataFrame)
def _(
cm_df1: pd.DataFrame,
cm_df2: pd.DataFrame,
op: str = "union",
is_triu_sym: bool = True,
axis: t.Optional[int] = None,
ret_mapping: bool = False,
ret_unique_ids: bool = False,
row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS,
col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS,
**kwargs,
) -> t.Union[t.Tuple[pd.DataFrame,...], pd.DataFrame]:
"""
Filter out unalignable regions from the input DataFrames.
Notes
-----
This function filters out rows and columns that are empty in both input DataFrames.
It operates on the IDs in the specified columns and returns the filtered DataFrames.
If `ret_mapping` is True, it also returns the mapping arrays.
Parameters
----------
cm_df1 : pd.DataFrame
The first input DataFrame.
cm_df2 : pd.DataFrame
The second input DataFrame.
op : str, optional
Operation to perform on the IDs.
One of "union" or "intersect".
Default is "union".
is_triu_sym : bool, optional
Whether the input matrices are upper triangular and symmetric.
Default is True.
axis : int, optional
Axis to operate on. If None, operate on both axes.
Default is None.
ret_mapping : bool, optional
Whether to return the mapping arrays.
Default is False.
ret_unique_ids : bool, optional
Whether to return the unique IDs array.
Default is False.
row_ids_colname : str, optional
Column name for row IDs.
Default is cm_consts.ROW_IDS_COLNAME.
col_ids_colname : str, optional
Column name for column IDs.
Default is cm_consts.COL_IDS_COLNAME.
Returns
-------
filtered_df1 : pd.DataFrame
The filtered first DataFrame.
filtered_df2 : pd.DataFrame
The filtered second DataFrame.
extras : tuple[np.ndarray,...], optional
Mapping arrays, if ret_mapping is True.
Examples
--------
Examples
--------
"""
row_ids1 = cm_df1[row_ids_colname].to_numpy()
col_ids1 = cm_df1[col_ids_colname].to_numpy()
row_ids2 = cm_df2[row_ids_colname].to_numpy()
col_ids2 = cm_df2[col_ids_colname].to_numpy()
out = filter_common_empty_rowcols(
(row_ids1, col_ids1),
(row_ids2, col_ids2),
op=op,
is_triu_sym=is_triu_sym,
axis=axis,
ret_mapping=ret_mapping,
ret_unique_ids=ret_unique_ids
)
#TODO: Differentiate between in-place and copy
cm_df1[row_ids_colname] = out[0]
cm_df1[col_ids_colname] = out[1]
cm_df2[row_ids_colname] = out[2]
cm_df2[col_ids_colname] = out[3]
extras = out[4:]
if len(extras):
return cm_df1, cm_df2, *extras
else:
return cm_df1, cm_df2