"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import numpy as np
import pandas as pd
import cooler
import h5py
import scipy.sparse as sp
from typing import Union
[docs]
def readMcool(fmcool: str, binSize: int):
"""Read from a mcool or cool file and return the Cooler object
Examples
--------
"""
mcool = h5py.File(fmcool, 'r')
if binSize > 0:
return cooler.Cooler(mcool['resolutions'][str(binSize)]), binSize
else:
cool = cooler.Cooler(mcool)
return cool, cool.binsize
[docs]
def cool2pixels(cool: cooler.api.Cooler):
"""Return the contact matrix in 'pixels' format
Examples
--------
"""
return cool.matrix(as_pixels=True, balance=False, sparse=True)
[docs]
def pixels2Coo(df: pd.DataFrame, bins: pd.DataFrame):
"""Convert Cooler's contact matrix in 'pixels' DataFrame to scipy coo_matrix
Examples
--------
"""
binOffset = bins.index[0]
nBins = bins.shape[0]
df['bin1_id'] -= binOffset
df['bin2_id'] -= binOffset
return sp.coo_matrix((df['count'].to_numpy(),
(df['bin1_id'].to_numpy(), df['bin2_id'].to_numpy())),
shape=(nBins, nBins))
[docs]
def getSubCoo(pixels: cooler.core.RangeSelector2D, bins: cooler.core.RangeSelector1D,
regionStr: str):
"""Fetch a region from Cooler contact matrix and return it as a coo_matrix
Examples
--------
"""
mSub = pixels.fetch(regionStr)
assert (mSub['bin1_id'] <= mSub['bin2_id']).all(),\
f"Contact matrix of region {regionStr} has lower-triangle entries"
binsSub = bins.fetch(regionStr)
return pixels2Coo(mSub, binsSub)
[docs]
def trimDiags(a: sp.coo_matrix, iDiagMax: int, bKeepMain: bool):
"""Remove diagonal elements whose diagonal index is >= iDiagMax or is == 0
Examples
--------
"""
gDist = np.abs(a.row - a.col)
idx = np.where((gDist < iDiagMax) & (bKeepMain | (gDist != 0)))
return sp.coo_matrix((a.data[idx], (a.row[idx], a.col[idx])),
shape=a.shape, dtype=a.dtype)
[docs]
def upperDiagCsr(m: sp.coo_matrix, nDiags: int):
"""Convert input sp.coo_matrix into a sp.csr_matrix with diagonals as rows
Examples
--------
"""
row = m.col - m.row
idx = np.where((row > 0) & (row < nDiags))
idxRowp1 = row[idx]
idxRow = idxRowp1 - 1
idxCol = m.col[idx] - idxRowp1
ans = sp.csr_matrix((m.data[idx], (idxRow, idxCol)),
shape=(nDiags - 1, m.shape[1]), dtype=m.dtype)
ans.eliminate_zeros()
return ans
[docs]
def meanFilterSparse(a: sp.coo_matrix, h: int):
"""Apply a mean filter to an input sparse matrix
Examples
--------
"""
assert h > 0, "meanFilterSparse half-size must be greater than 0"
assert sp.issparse(a) and a.getformat() == 'coo',\
"meanFilterSparse input matrix is not scipy.sparse.coo_matrix"
assert a.shape[0] == a.shape[1],\
"meanFilterSparse cannot handle non-square matrix"
fSize = 2 * h + 1
shapeOut = np.array(a.shape) + fSize - 1
mToeplitz = sp.diags(np.ones(fSize),
np.arange(-fSize+1, 1),
shape=(shapeOut[1], a.shape[1]),
format='csr')
ans = sp.coo_matrix((mToeplitz @ a) @ mToeplitz.T)
ansNoEdge = ans.tocsr()[h:(h+a.shape[0]), h:(h+a.shape[1])].tocoo()
rowDist2Edge = np.minimum(ansNoEdge.row, ansNoEdge.shape[0] - 1 - ansNoEdge.row)
nDim1 = h + 1 + np.minimum(rowDist2Edge, h)
colDist2Edge = np.minimum(ansNoEdge.col, ansNoEdge.shape[1] - 1 - ansNoEdge.col)
nDim2 = h + 1 + np.minimum(colDist2Edge, h)
nNeighbors = nDim1 * nDim2
ansNoEdge.data /= nNeighbors
return ansNoEdge
[docs]
def varVstran(n: Union[int, np.ndarray]):
"""Calculate the variance of variance-stabilizing transformed data
Examples
--------
"""
with np.errstate(divide='ignore', invalid='ignore'):
result = np.where(n < 2, np.nan, (1 + 1.0 / n) / 12.0)
return result
[docs]
def resample(m: sp.coo_matrix, size: int):
"""Resample with replacement the input matrix to sum to the given size
Examples
--------
"""
if m.data.size == 0:
return m
bins = np.arange(m.data.size)
p = m.data / m.data.sum()
samples = np.random.choice(bins, size=size, p=p)
sampledData = np.bincount(samples, minlength=bins.size)
ans = sp.coo_matrix((sampledData, (m.row, m.col)), shape=m.shape)
ans.eliminate_zeros()
return ans
[docs]
def coolerInfo(cool: cooler.api.Cooler, k: str):
"""Retrieve metadata from Cooler file
Examples
--------
"""
if k in cool.info:
return cool.info[k]
elif k == 'sum':
return cool.pixels()['count'][:].sum()
elif k == 'nbins':
return cool.bins().shape[0]
elif k == 'nnz':
return cool.pixels().shape[0]
elif k == 'nchroms':
return cool.chroms().shape[0]
else:
raise KeyError(f'Unable to retrieve metadata field \'{k}\'')