Source code for gunz_cm.preprocs.rc_filters_common

# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.0"
__license__ = "Clear BSD"
# __version__ = "1.0."
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"

import functools
import typing as t
from gunz_cm.exceptions import PreprocError
import numpy as np
import pandas as pd
from pydantic import validate_call, ConfigDict
from scipy import sparse as sp
from .. import consts as cm_consts
from .infer_shape import (
    infer_mat_shape
)

AVAIL_OP = {
    "union": np.union1d,
    "intersect": np.intersect1d,
}

[docs] @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @functools.singledispatch def filter_common_empty_rowcols( data1, data2, op:str="union", is_triu_sym:bool=True, axis:t.Optional[int]=None, ret_mapping:bool=False, ret_unique_ids:bool=False, ): """ Filter out unalignable regions from the input data. Parameters ---------- data : pandas.DataFrame or scipy.sparse matrix The input data. is_triu_sym : bool, optional If the input is symmetric but only the upper triangle part of the matrix is given. Defaults to True. axis : int, optional The axis to filter on. Defaults to None. ret_mapping : bool, optional Whether to return the mapping of the original ids to the new ids. Defaults to False. Returns ------- filtered_data : pandas.DataFrame or scipy.sparse matrix The filtered data. Examples -------- """ raise PreprocError(f"No implementation for data type: {type(data1).__name__}")
@filter_common_empty_rowcols.register(np.ndarray) def _( cm_mat1: np.ndarray, cm_mat2: np.ndarray, is_triu_sym:bool=True, axis:int=None, ret_mapping:bool=False, **kwargs, ) -> np.ndarray: """ Filter out unalignable regions from the input matrix. Parameters ---------- data : ndarray The input matrix (dense matrix). is_triu_sym : bool, optional If the input is symmetric but only the upper triangle part of the matrix is given. Defaults to True. ret_mapping : bool, optional Whether to return the mapping of the original ids to the new ids. Defaults to False. ret_mask : bool, optional Whether to return the mask of the original ids. Defaults to False. Returns ------- filtered_data : ndarray The filtered matrix. Examples -------- """ raise NotImplementedError("filter_common_empty_rowcols_mat is not yet implemented!") @filter_common_empty_rowcols.register(tuple) def _( data1: t.Tuple[np.ndarray, np.ndarray], data2: t.Tuple[np.ndarray, np.ndarray], op: str = "union", is_triu_sym: bool = True, axis: t.Optional[int] = None, ret_mapping: bool = False, ret_unique_ids: bool = False, **kwargs, ) -> t.Tuple[np.ndarray, np.ndarray, t.Optional[np.ndarray], t.Optional[np.ndarray]]: """ Filter common empty row and column IDs from two sets of row and column IDs. Notes ----- This function filters common empty row and column IDs from two sets of row and column IDs. It can perform union or intersection operations on the IDs and handle triangular and symmetric matrices. The function can also return the mapping arrays and unique IDs. Parameters ---------- data1 : Tuple[np.ndarray, np.ndarray] Tuple of row and column IDs from the first set. data2 : Tuple[np.ndarray, np.ndarray] Tuple of row and column IDs from the second set. op : str, optional Operation to perform on the IDs. One of "union" or "intersect". Default is "union". is_triu_sym : bool, optional Whether the input matrices are triangular and symmetric. Default is True. axis : int, optional Axis to operate on. If None, operate on both axes. Default is None. ret_mapping : bool, optional Whether to return the mapping arrays. Default is False. ret_unique_ids : bool, optional Whether to return the unique IDs array. Default is False. Returns ------- new_row_ids1 : np.ndarray Filtered row IDs from the first set. new_col_ids1 : np.ndarray Filtered column IDs from the first set. new_row_ids2 : np.ndarray Filtered row IDs from the second set. new_col_ids2 : np.ndarray Filtered column IDs from the second set. extras : list[np.ndarray], optional Additional returned arrays, such as mapping arrays, if ret_mapping is True. Examples -------- Examples -------- """ if op not in AVAIL_OP: raise PreprocError(f"Invalid operation: {op}") row_ids1, col_ids1 = data1 row_ids2, col_ids2 = data2 ret = [] if is_triu_sym is True and axis is None: unique_ids1 = np.union1d(row_ids1, col_ids1) unique_ids2 = np.union1d(row_ids2, col_ids2) op_f = AVAIL_OP[op] unique_ids = op_f(unique_ids1, unique_ids2) mapping = {uid: i for i, uid in enumerate(unique_ids)} new_row_ids1 = np.array([mapping[i] for i in row_ids1 if i in mapping]) new_col_ids1 = np.array([mapping[i] for i in col_ids1 if i in mapping]) new_row_ids2 = np.array([mapping[i] for i in row_ids2 if i in mapping]) new_col_ids2 = np.array([mapping[i] for i in col_ids2 if i in mapping]) ret.extend([new_row_ids1, new_col_ids1]) ret.extend([new_row_ids2, new_col_ids2]) if ret_mapping: ret.append(mapping) if ret_unique_ids: ret.append(unique_ids) #? If non-symmetric or handle only one axis else: if axis is None or axis == 1: unique_row_ids1 = np.unique(row_ids1) unique_row_ids2 = np.unique(row_ids2) op_f = AVAIL_OP[op] unique_row_ids = op_f(unique_row_ids1, unique_row_ids2) row_mapping = {uid: i for i, uid in enumerate(unique_row_ids)} new_row_ids1 = np.array([row_mapping[i] for i in row_ids1 if i in row_mapping]) new_row_ids2 = np.array([row_mapping[i] for i in row_ids2 if i in row_mapping]) else: new_row_ids1 = row_ids1 new_row_ids2 = row_ids2 if axis is None or axis == 0: unique_col_ids1 = np.unique(col_ids1) unique_col_ids2 = np.unique(col_ids2) op_f = AVAIL_OP[op] unique_col_ids = op_f(unique_col_ids1, unique_col_ids2) col_mapping = {uid: i for i, uid in enumerate(unique_col_ids)} new_col_ids1 = np.array([col_mapping[i] for i in col_ids1 if i in col_mapping]) new_col_ids2 = np.array([col_mapping[i] for i in col_ids2 if i in col_mapping]) else: new_col_ids1 = col_ids1 new_col_ids2 = col_ids2 ret.extend([new_row_ids1, new_col_ids1]) ret.extend([new_row_ids2, new_col_ids2]) if ret_mapping: if axis is None or axis == 1: ret.append(row_mapping) ret.append(unique_row_ids) if axis is None or axis == 0: ret.append(col_mapping) ret.append(unique_col_ids) return ret @filter_common_empty_rowcols.register(sp.coo_matrix) def _( cm_coo1: sp.coo_matrix, cm_coo2: sp.coo_matrix, op: str = "union", is_triu_sym: bool = True, axis: t.Optional[int] = None, ret_mapping: bool = False, ret_unique_ids: bool = False, **kwargs, ) -> t.Tuple[sp.spmatrix, t.Optional[t.Tuple[np.ndarray,...]]]: """ Filter common empty row and column IDs from two COO matrices. Parameters ---------- coo1 : sparse.coo_matrix First COO matrix. coo2 : sparse.coo_matrix Second COO matrix. op : str, optional Operation to perform on the IDs. One of "union" or "intersect". Default is "union". is_triu_sym : bool, optional Whether the input matrices are upper triangular and symmetric. Default is True. axis : int, optional Axis to operate on. If None, operate on both axes. Default is None. ret_mapping : bool, optional Whether to return the mapping arrays. Default is False. ret_unique_ids : bool, optional Whether to return the unique IDs array. Default is False. Returns ------- new_coo1 : sparse.spmatrix Filtered COO matrix from the first matrix. new_coo2 : sparse.spmatrix Filtered COO matrix from the second matrix. extras : tuple[np.ndarray,...], optional Mapping arrays, if ret_mapping is True. Examples -------- """ if not sp.issparse(cm_coo1): raise PreprocError("Matrix 1 must be sparse.") #? Preprocess: detect the type of sparse matrix and convert to COO if necessary if not isinstance(cm_coo1, sp.coo_matrix): raise PreprocError("Matrix 1 must be a COO matrix.") if not isinstance(cm_coo2, sp.coo_matrix): raise PreprocError("Matrix 2 must be a COO matrix.") row_ids1, col_ids1 = cm_coo1.row, cm_coo1.col row_ids2, col_ids2 = cm_coo2.row, cm_coo2.col out = filter_common_empty_rowcols( (row_ids1, col_ids1), (row_ids2, col_ids2), op=op, is_triu_sym=is_triu_sym, axis=axis, ret_mapping=ret_mapping, ret_unique_ids=ret_unique_ids, ) new_row_ids1, new_col_ids1 = out[0], out[1] new_row_ids2, new_col_ids2 = out[2], out[3] extras = out[4:] #TODO: Fix/unify the if-case if is_triu_sym is True and axis is None: [new_nrows1, new_ncols1] = infer_mat_shape( (row_ids1, col_ids1), is_triu_sym=is_triu_sym, ) [new_nrows2, new_ncols2] = infer_mat_shape( (row_ids2, col_ids2), is_triu_sym=is_triu_sym, ) new_nrows = np.amax([new_nrows1, new_nrows2]) new_ncols = np.amax([new_ncols1, new_ncols2]) new_shape = [new_nrows, new_ncols] else: [new_nrows, new_ncols] = infer_mat_shape( (new_row_ids1, new_col_ids1), is_triu_sym=False, ) if axis is None or axis == 1: pass else: new_nrows = cm_coo1.shape[0] if axis is None or axis == 0 : pass else: new_ncols = cm_coo1.shape[1] new_shape = [new_nrows, new_ncols] new_coo1 = sp.coo_matrix( (cm_coo1.data, (new_row_ids1, new_col_ids1)), shape=new_shape ) new_coo2 = sp.coo_matrix( (cm_coo2.data, (new_row_ids2, new_col_ids2)), shape=new_shape ) if len(extras): return new_coo1, new_coo2, *extras else: return new_coo1, new_coo2 @filter_common_empty_rowcols.register(pd.DataFrame) def _( cm_df1: pd.DataFrame, cm_df2: pd.DataFrame, op: str = "union", is_triu_sym: bool = True, axis: t.Optional[int] = None, ret_mapping: bool = False, ret_unique_ids: bool = False, row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS, **kwargs, ) -> t.Union[t.Tuple[pd.DataFrame,...], pd.DataFrame]: """ Filter out unalignable regions from the input DataFrames. Notes ----- This function filters out rows and columns that are empty in both input DataFrames. It operates on the IDs in the specified columns and returns the filtered DataFrames. If `ret_mapping` is True, it also returns the mapping arrays. Parameters ---------- cm_df1 : pd.DataFrame The first input DataFrame. cm_df2 : pd.DataFrame The second input DataFrame. op : str, optional Operation to perform on the IDs. One of "union" or "intersect". Default is "union". is_triu_sym : bool, optional Whether the input matrices are upper triangular and symmetric. Default is True. axis : int, optional Axis to operate on. If None, operate on both axes. Default is None. ret_mapping : bool, optional Whether to return the mapping arrays. Default is False. ret_unique_ids : bool, optional Whether to return the unique IDs array. Default is False. row_ids_colname : str, optional Column name for row IDs. Default is cm_consts.ROW_IDS_COLNAME. col_ids_colname : str, optional Column name for column IDs. Default is cm_consts.COL_IDS_COLNAME. Returns ------- filtered_df1 : pd.DataFrame The filtered first DataFrame. filtered_df2 : pd.DataFrame The filtered second DataFrame. extras : tuple[np.ndarray,...], optional Mapping arrays, if ret_mapping is True. Examples -------- Examples -------- """ row_ids1 = cm_df1[row_ids_colname].to_numpy() col_ids1 = cm_df1[col_ids_colname].to_numpy() row_ids2 = cm_df2[row_ids_colname].to_numpy() col_ids2 = cm_df2[col_ids_colname].to_numpy() out = filter_common_empty_rowcols( (row_ids1, col_ids1), (row_ids2, col_ids2), op=op, is_triu_sym=is_triu_sym, axis=axis, ret_mapping=ret_mapping, ret_unique_ids=ret_unique_ids ) #TODO: Differentiate between in-place and copy cm_df1[row_ids_colname] = out[0] cm_df1[col_ids_colname] = out[1] cm_df2[row_ids_colname] = out[2] cm_df2[col_ids_colname] = out[3] extras = out[4:] if len(extras): return cm_df1, cm_df2, *extras else: return cm_df1, cm_df2