Source code for gunz_cm.metrics.ren.third_parties.hicrep.utils

"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import numpy as np
import pandas as pd
import cooler
import h5py
import scipy.sparse as sp
from typing import Union

[docs] def readMcool(fmcool: str, binSize: int): """Read from a mcool or cool file and return the Cooler object Examples -------- """ mcool = h5py.File(fmcool, 'r') if binSize > 0: return cooler.Cooler(mcool['resolutions'][str(binSize)]), binSize else: cool = cooler.Cooler(mcool) return cool, cool.binsize
[docs] def cool2pixels(cool: cooler.api.Cooler): """Return the contact matrix in 'pixels' format Examples -------- """ return cool.matrix(as_pixels=True, balance=False, sparse=True)
[docs] def pixels2Coo(df: pd.DataFrame, bins: pd.DataFrame): """Convert Cooler's contact matrix in 'pixels' DataFrame to scipy coo_matrix Examples -------- """ binOffset = bins.index[0] nBins = bins.shape[0] df['bin1_id'] -= binOffset df['bin2_id'] -= binOffset return sp.coo_matrix((df['count'].to_numpy(), (df['bin1_id'].to_numpy(), df['bin2_id'].to_numpy())), shape=(nBins, nBins))
[docs] def getSubCoo(pixels: cooler.core.RangeSelector2D, bins: cooler.core.RangeSelector1D, regionStr: str): """Fetch a region from Cooler contact matrix and return it as a coo_matrix Examples -------- """ mSub = pixels.fetch(regionStr) assert (mSub['bin1_id'] <= mSub['bin2_id']).all(),\ f"Contact matrix of region {regionStr} has lower-triangle entries" binsSub = bins.fetch(regionStr) return pixels2Coo(mSub, binsSub)
[docs] def trimDiags(a: sp.coo_matrix, iDiagMax: int, bKeepMain: bool): """Remove diagonal elements whose diagonal index is >= iDiagMax or is == 0 Examples -------- """ gDist = np.abs(a.row - a.col) idx = np.where((gDist < iDiagMax) & (bKeepMain | (gDist != 0))) return sp.coo_matrix((a.data[idx], (a.row[idx], a.col[idx])), shape=a.shape, dtype=a.dtype)
[docs] def upperDiagCsr(m: sp.coo_matrix, nDiags: int): """Convert input sp.coo_matrix into a sp.csr_matrix with diagonals as rows Examples -------- """ row = m.col - m.row idx = np.where((row > 0) & (row < nDiags)) idxRowp1 = row[idx] idxRow = idxRowp1 - 1 idxCol = m.col[idx] - idxRowp1 ans = sp.csr_matrix((m.data[idx], (idxRow, idxCol)), shape=(nDiags - 1, m.shape[1]), dtype=m.dtype) ans.eliminate_zeros() return ans
[docs] def meanFilterSparse(a: sp.coo_matrix, h: int): """Apply a mean filter to an input sparse matrix Examples -------- """ assert h > 0, "meanFilterSparse half-size must be greater than 0" assert sp.issparse(a) and a.getformat() == 'coo',\ "meanFilterSparse input matrix is not scipy.sparse.coo_matrix" assert a.shape[0] == a.shape[1],\ "meanFilterSparse cannot handle non-square matrix" fSize = 2 * h + 1 shapeOut = np.array(a.shape) + fSize - 1 mToeplitz = sp.diags(np.ones(fSize), np.arange(-fSize+1, 1), shape=(shapeOut[1], a.shape[1]), format='csr') ans = sp.coo_matrix((mToeplitz @ a) @ mToeplitz.T) ansNoEdge = ans.tocsr()[h:(h+a.shape[0]), h:(h+a.shape[1])].tocoo() rowDist2Edge = np.minimum(ansNoEdge.row, ansNoEdge.shape[0] - 1 - ansNoEdge.row) nDim1 = h + 1 + np.minimum(rowDist2Edge, h) colDist2Edge = np.minimum(ansNoEdge.col, ansNoEdge.shape[1] - 1 - ansNoEdge.col) nDim2 = h + 1 + np.minimum(colDist2Edge, h) nNeighbors = nDim1 * nDim2 ansNoEdge.data /= nNeighbors return ansNoEdge
[docs] def varVstran(n: Union[int, np.ndarray]): """Calculate the variance of variance-stabilizing transformed data Examples -------- """ with np.errstate(divide='ignore', invalid='ignore'): result = np.where(n < 2, np.nan, (1 + 1.0 / n) / 12.0) return result
[docs] def resample(m: sp.coo_matrix, size: int): """Resample with replacement the input matrix to sum to the given size Examples -------- """ if m.data.size == 0: return m bins = np.arange(m.data.size) p = m.data / m.data.sum() samples = np.random.choice(bins, size=size, p=p) sampledData = np.bincount(samples, minlength=bins.size) ans = sp.coo_matrix((sampledData, (m.row, m.col)), shape=m.shape) ans.eliminate_zeros() return ans
[docs] def coolerInfo(cool: cooler.api.Cooler, k: str): """Retrieve metadata from Cooler file Examples -------- """ if k in cool.info: return cool.info[k] elif k == 'sum': return cool.pixels()['count'][:].sum() elif k == 'nbins': return cool.bins().shape[0] elif k == 'nnz': return cool.pixels().shape[0] elif k == 'nchroms': return cool.chroms().shape[0] else: raise KeyError(f'Unable to retrieve metadata field \'{k}\'')