Source code for gunz_cm.preprocs.mirrors

# -*- coding: utf-8 -*-
"""
Optimized mirror operations for genomic contact matrices.
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__version__ = "1.0.1"
__license__ = "Clear BSD"
__email__ = "adhisant@tnt.uni-hannover.de"

import functools
import typing as t
import numpy as np
import pandas as pd
from scipy import sparse as sp
from numba import njit

from gunz_cm.exceptions import PreprocError
from .. import consts as cm_consts

@njit(cache=True)
def _fast_mirror_arrays(
    row_ids: np.ndarray,
    col_ids: np.ndarray,
    data: np.ndarray,
    remove_diag: bool,
    double_diag: bool,
) -> t.Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Numba-accelerated helper to compute mirrored arrays for a symmetric matrix.
    """
    n = len(row_ids)

    # First pass: count elements to allocate memory accurately
    upper_count = 0
    diag_count = 0
    for i in range(n):
        r = row_ids[i]
        c = col_ids[i]
        if r < c:
            upper_count += 1
        elif r == c:
            diag_count += 1

    # Calculate output size
    out_size = 2 * upper_count
    if not remove_diag:
        if double_diag:
            out_size += 2 * diag_count
        else:
            out_size += diag_count

    # Allocate output arrays
    out_rows = np.empty(out_size, dtype=row_ids.dtype)
    out_cols = np.empty(out_size, dtype=col_ids.dtype)
    out_data = np.empty(out_size, dtype=data.dtype)

    # Second pass: fill arrays
    idx = 0
    for i in range(n):
        r = row_ids[i]
        c = col_ids[i]
        d = data[i]

        if r < c:
            # Add original upper triangle element
            out_rows[idx] = r
            out_cols[idx] = c
            out_data[idx] = d
            idx += 1

            # Add mirrored lower triangle element
            out_rows[idx] = c
            out_cols[idx] = r
            out_data[idx] = d
            idx += 1
        elif r == c and not remove_diag:
            # Add diagonal element
            out_rows[idx] = r
            out_cols[idx] = c
            out_data[idx] = d
            idx += 1

            if double_diag:
                # Add duplicate diagonal element
                out_rows[idx] = r
                out_cols[idx] = c
                out_data[idx] = d
                idx += 1

    return out_rows, out_cols, out_data

[docs] @functools.singledispatch def mirror_upper_to_lower_triangle( mat: t.Any, remove_diag: bool = False, double_diag: bool = False, ) -> t.Any: """ Mirror the upper triangle part to the lower triangle part of a matrix. Parameters ---------- mat : pandas.DataFrame or scipy.sparse.spmatrix Input matrix. Supported types are pandas DataFrame and any scipy sparse matrix. remove_diag : bool, optional Whether to remove the main diagonal. Defaults to False. double_diag : bool, optional Whether to double the diagonal entries. Defaults to False. This is useful for preserving behavior of certain legacy implementations. Ignored if remove_diag is True. Returns ------- any Resulting matrix with the upper triangle mirrored to the lower triangle, in the same format as input. Raises ------ PreprocError If the input type is not supported. """ raise PreprocError(f"Input data must be a pandas DataFrame or a scipy sparse matrix. Got {type(mat)}")
[docs] def mirror_upper_to_lower_triangle_coo( cm_coo: sp.coo_matrix, remove_diag: bool = False, double_diag: bool = False, ) -> sp.coo_matrix: """ Implementation of mirror_upper_to_lower_triangle for COO matrices. Parameters ---------- cm_coo : scipy.sparse.coo_matrix The input sparse matrix. remove_diag : bool, optional Whether to remove the main diagonal. Defaults to False. double_diag : bool, optional Whether to double the diagonal entries. Defaults to False. Returns ------- scipy.sparse.coo_matrix The resulting symmetric sparse matrix. """ if not isinstance(cm_coo, sp.coo_matrix): raise PreprocError(f"Input must be a scipy.sparse.coo_matrix. Got {type(cm_coo)}") #? Get the row and column indices of the input matrix row_ids = cm_coo.row col_ids = cm_coo.col data = cm_coo.data #? Fast Numba array looping and copying out_rows, out_cols, out_data = _fast_mirror_arrays( row_ids, col_ids, data, remove_diag, double_diag ) out_mat = sp.coo_matrix( (out_data, (out_rows, out_cols)), shape=cm_coo.shape ) return out_mat
[docs] def mirror_upper_to_lower_triangle_df( cm_df: pd.DataFrame, remove_diag: bool = False, double_diag: bool = False, row_ids_colname: str = cm_consts.DataFrameSpecs.ROW_IDS, col_ids_colname: str = cm_consts.DataFrameSpecs.COL_IDS, vals_colname: str = cm_consts.DataFrameSpecs.COUNTS, ) -> pd.DataFrame: """ Implementation of mirror_upper_to_lower_triangle for pandas DataFrames. Parameters ---------- cm_df : pandas.DataFrame The input DataFrame representing the matrix. remove_diag : bool, optional Whether to remove the main diagonal. Defaults to False. double_diag : bool, optional Whether to double the diagonal entries. Defaults to False. row_ids_colname : str, optional Column name for row IDs. Defaults to 'bin1_id'. col_ids_colname : str, optional Column name for column IDs. Defaults to 'bin2_id'. vals_colname : str, optional Column name for contact counts. Defaults to 'count'. Returns ------- pandas.DataFrame The resulting symmetric DataFrame. """ upper_triu_mask = cm_df[row_ids_colname] < cm_df[col_ids_colname] upper_triu_df = cm_df.loc[upper_triu_mask, :] lower_triu_df = pd.DataFrame() lower_triu_df[row_ids_colname] = upper_triu_df[col_ids_colname] lower_triu_df[col_ids_colname] = upper_triu_df[row_ids_colname] lower_triu_df[vals_colname] = upper_triu_df[vals_colname] if remove_diag: return pd.concat([upper_triu_df, lower_triu_df]) else: diag_mask = cm_df[row_ids_colname] == cm_df[col_ids_colname] diag_df = cm_df.loc[diag_mask, :] if double_diag: return pd.concat([upper_triu_df, lower_triu_df, diag_df, diag_df]) return pd.concat([upper_triu_df, lower_triu_df, diag_df])
# Register implementations mirror_upper_to_lower_triangle.register(sp.coo_matrix, mirror_upper_to_lower_triangle_coo) mirror_upper_to_lower_triangle.register(sp.csr_matrix, mirror_upper_to_lower_triangle_coo) mirror_upper_to_lower_triangle.register(pd.DataFrame, mirror_upper_to_lower_triangle_df)
[docs] def symmetrize_edges( rows: np.ndarray, cols: np.ndarray, data: np.ndarray, shape: t.Tuple[int, int], double_diag: bool = False, ) -> sp.coo_matrix: """ Construct a symmetric COO matrix from directed edge arrays. Parameters ---------- rows : np.ndarray Row indices. cols : np.ndarray Column indices. data : np.ndarray Values. shape : tuple of int Shape of the resulting matrix (rows, cols). double_diag : bool, optional Whether to include diagonal elements twice. Defaults to False. Returns ------- scipy.sparse.coo_matrix A symmetric sparse matrix in COO format. """ mask = (rows != cols) if not double_diag: rows_off = rows[mask] cols_off = cols[mask] data_off = data[mask] rows_sym = np.concatenate([rows, cols_off]) cols_sym = np.concatenate([cols, rows_off]) data_sym = np.concatenate([data, data_off]) else: # Just swap all and concatenate rows_sym = np.concatenate([rows, cols]) cols_sym = np.concatenate([cols, rows]) data_sym = np.concatenate([data, data]) return sp.coo_matrix((data_sym, (rows_sym, cols_sym)), shape=shape)