Source code for gunz_cm.metrics.ren.third_parties.hicrep.hicrep

# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__version__ = "1.0.0"
__license__ = "GNU General Public License v3.0"
__source__ = "https://github.com/dejunlin/hicrep"

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright 2019 dejunlin <dejun.lin@gmail.com>
# Usage: hicrep.py
# Description: Compute HiCRep reproducibility stratum-corrected correlation score (SCCS).
# Reference: Genome Res. 2017 Nov;27(11):1939-1949. doi: 10.1101/gr.220640.117
# The algorithm first normalizes the input contact matrices by the total
# number of contacts and then for each chromosome: 1) mean-filter the input
# matrices with an input window size; 2) exclude common zero entries in
# the input matrices; 3) compute the SCC score. It doesn't have the
# procedure to bootstrap the window-size parameter
#
# Distributed under terms of the GNU General Public License v3.0.
# from deprecated import deprecated
import numpy as np
import scipy.sparse as sp
import math
import warnings
import cooler
from .utils import (
    cool2pixels, getSubCoo,
    trimDiags, meanFilterSparse, varVstran,
    resample, upperDiagCsr, coolerInfo
    )

# @deprecated("Use sccByDiag instead")
[docs] def sccOfDiag(diag1: np.ndarray, diag2: np.ndarray): """Get the correlation coefficient and weight of two input diagonal arrays Args: diag1: `np.ndarray` input array 1 diag2: `np.ndarray` input array 2 Returns: tuple of 2 floats, the Pearson's correlation rho and weight Examples -------- """ # remove common zeros idxNZ = np.where((diag1 != 0.0) | (diag2 != 0.0))[0] iN = idxNZ.size if iN <= 2: return (np.nan, np.nan) iDiagNZ1 = diag1[idxNZ] iDiagNZ2 = diag2[idxNZ] rho = np.corrcoef(iDiagNZ1, iDiagNZ2)[0, 1] iDiagVarVstran1 = varVstran(iDiagNZ1.size) iDiagVarVstran2 = varVstran(iDiagNZ2.size) ws = iN * np.sqrt(iDiagVarVstran1 * iDiagVarVstran2) if math.isnan(rho) or math.isnan(ws): return (np.nan, np.nan) return (rho, ws)
[docs] def sccByDiag(m1: sp.coo_matrix, m2: sp.coo_matrix, nDiags: int): """Compute diagonal-wise hicrep SCC score for the two input matrices up to nDiags diagonals Args: m1 (sp.coo_matrix): input contact matrix 1 m2 (sp.coo_matrix): input contact matrix 2 nDiags (int): compute SCC scores for diagonals whose index is in the range of [1, nDiags) Returns: `float` hicrep SCC scores Examples -------- """ # convert each diagonal to one row of a csr_matrix in order to compute # diagonal-wise correlation between m1 and m2 m1D = upperDiagCsr(m1, nDiags) m2D = upperDiagCsr(m2, nDiags) nSamplesD = (m1D + m2D).getnnz(axis=1) rowSumM1D = m1D.sum(axis=1).A1 rowSumM2D = m2D.sum(axis=1).A1 # ignore zero-division warnings because the corresponding elements in the # output don't contribute to the SCC scores with np.errstate(divide='ignore', invalid='ignore'): cov = m1D.multiply(m2D).sum(axis=1).A1 - rowSumM1D * rowSumM2D / nSamplesD rhoD = cov / np.sqrt( (m1D.power(2).sum(axis=1).A1 - np.square(rowSumM1D) / nSamplesD ) * (m2D.power(2).sum(axis=1).A1 - np.square(rowSumM2D) / nSamplesD )) wsD = nSamplesD * varVstran(nSamplesD) # Convert NaN and Inf resulting from div by 0 to zeros. # posinf and neginf added to fix behavior seen in 4DN datasets # 4DNFIOQLTI9G and DNFIH7MQHOR at 5kb where inf would be reported # as an SCC score wsNan2Zero = np.nan_to_num(wsD, copy=True, posinf=0.0, neginf=0.0) rhoNan2Zero = np.nan_to_num(rhoD, copy=True, posinf=0.0, neginf=0.0) return rhoNan2Zero @ wsNan2Zero / wsNan2Zero.sum()
[docs] def hicrepSCC(cool1: cooler.api.Cooler, cool2: cooler.api.Cooler, h: int, dBPMax: int, bDownSample: bool, chrNames: list = None, excludeChr: set = None): """Compute hicrep score between two input Cooler contact matrices Args: cool1: `cooler.api.Cooler` Input Cooler contact matrix 1 cool2: `cooler.api.Cooler` Input Cooler contact matrix 2 h: `int` Half-size of the mean filter used to smooth the input matrics dBPMax `int` Only include contacts that are at most this genomic distance (bp) away bDownSample: `bool` Down sample the input with more contacts to the same number of contacts as in the other input chrNames: `list` List of chromosome names whose SCC to compute. Default to None, which means all chromosomes in the genome are used to compute SCC excludeChr: `set` Set of chromosome names to exclude from SCC computation. Default to None. Returns: `float` scc scores for each chromosome Examples -------- """ binSize1 = cool1.binsize binSize2 = cool2.binsize assert binSize1 == binSize2,\ "Input cool files have different bin sizes" assert coolerInfo(cool1, 'nbins') == coolerInfo(cool2, 'nbins'),\ "Input cool files have different number of bins" assert coolerInfo(cool1, 'nchroms') == coolerInfo(cool2, 'nchroms'),\ "Input cool files have different number of chromosomes" assert (cool1.chroms()[:] == cool2.chroms()[:]).all()[0],\ "Input file have different chromosome names" binSize = binSize1 bins1 = cool1.bins() bins2 = cool2.bins() if binSize is None: # sometimes bin size can be None, e.g., input cool file has # non-uniform size bins. assert np.all(bins1[:] == bins2[:]),\ "Input cooler files don't have a unique bin size most likely "\ "because non-uniform bin size was used and the bins are defined "\ "differently for the two input cooler files" # In that case, use the median bin size binSize = int(np.median((bins1[:]["end"] - bins1[:]["start"]).values)) warnings.warn("Input cooler files don't have a unique bin size most "\ "likely because non-uniform bin size was used. HicRep "\ "will use median bin size from the first cooler file "\ "to determine maximal diagonal index to include", RuntimeWarning) if dBPMax == -1: # this is the exclusive upper bound dMax = coolerInfo(cool1, 'nbins') else: dMax = dBPMax // binSize + 1 assert dMax > 1, "Input dBPmax is smaller than binSize" p1 = cool2pixels(cool1) p2 = cool2pixels(cool2) # get the total number of contacts as normalizing constant n1 = coolerInfo(cool1, 'sum') n2 = coolerInfo(cool2, 'sum') # Use dict here so that the chrNames don't duplicate if chrNames is None: chrNamesDict = dict.fromkeys(cool1.chroms()[:]['name'].tolist()) else: chrNamesDict = dict.fromkeys(chrNames) # It's important to preserve the order of the input chrNames so that the # user knows the order of the output SCC scores so we bail when encounter # duplicate names rather than implicit prunning the names. assert chrNames is None or len(chrNamesDict) == len(chrNames), f""" Found Duplicates in {chrNames}. Please remove them. """ # filter out excluded chromosomes if excludeChr is None: excludeChr = set() chrNames = [ chrName for chrName in chrNamesDict if chrName not in excludeChr ] scc = np.full(len(chrNames), -2.0) for iChr, chrName in enumerate(chrNames): # normalize by total number of contacts mS1 = getSubCoo(p1, bins1, chrName) assert mS1.size > 0, "Contact matrix 1 of chromosome %s is empty" % (chrName) assert mS1.shape[0] == mS1.shape[1],\ "Contact matrix 1 of chromosome %s is not square" % (chrName) mS2 = getSubCoo(p2, bins2, chrName) assert mS2.size > 0, "Contact matrix 2 of chromosome %s is empty" % (chrName) assert mS2.shape[0] == mS2.shape[1],\ "Contact matrix 2 of chromosome %s is not square" % (chrName) assert mS1.shape == mS2.shape,\ "Contact matrices of chromosome %s have different input shape" % (chrName) nDiags = mS1.shape[0] if dMax < 0 else min(dMax, mS1.shape[0]) rho = np.full(nDiags, np.nan) ws = np.full(nDiags, np.nan) # remove major diagonal and all the diagonals >= nDiags # to save computation time m1 = trimDiags(mS1, nDiags, False) m2 = trimDiags(mS2, nDiags, False) del mS1 del mS2 if bDownSample: # do downsampling size1 = m1.sum() size2 = m2.sum() if size1 > size2: m1 = resample(m1, size2).astype(float) elif size2 > size1: m2 = resample(m2, size1).astype(float) else: # just normalize by total contacts m1 = m1.astype(float) / n1 m2 = m2.astype(float) / n2 if h > 0: # apply smoothing m1 = meanFilterSparse(m1, h) m2 = meanFilterSparse(m2, h) scc[iChr] = sccByDiag(m1, m2, nDiags) return scc