# -*- coding: utf-8 -*-
"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__version__ = "1.0.0"
__license__ = "MIT License"
__source__ = "https://github.com/kundajelab/3DChromatin_ReplicateQC"
from gunz_cm.utils.logger import logger
import sys
import numpy
from scipy.sparse import lil_matrix
from scipy.sparse.linalg import eigsh
[docs]
def Parse_matrix(file1,file2):
"""
Function Parse_matrix.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
max_index=0
max_index_temp=0
with open(file1) as input_file:
for line in input_file:
x,y,z=map(int, line.split())
max_index_temp=max(x,y)
if max_index_temp>max_index:
max_index=max_index_temp
with open(file2) as input_file:
for line in input_file:
x,y,z=map(int, line.split())
max_index_temp=max(x,y)
if max_index_temp>max_index:
max_index=max_index_temp
M1=lil_matrix((max_index,max_index))
M2=lil_matrix((max_index,max_index))
with open(file1) as input_file:
for line in input_file:
x,y,z=map(int, line.split())
M1[x-1,y-1]=z
M1[y-1,x-1]=z
with open(file2) as input_file:
for line in input_file:
x,y,z=map(int, line.split())
M2[x-1,y-1]=z
M2[y-1,x-1]=z
return M1, M2
[docs]
def Parse_matrix_lieberman(file1,file2, resolution):
"""
Function Parse_matrix_lieberman.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
#? Makes the import local to this function
import gzip
max_index=0
max_index_temp=0
with gzip.open(file1) as input_file:
for line in input_file:
if line[0]!="#":
x,y,z=map(float, line.split())
x=int(x)
y=int(y)
z=int(z)
max_index_temp=max(x,y)
if max_index_temp>max_index:
max_index=max_index_temp
with gzip.open(file2) as input_file:
for line in input_file:
if line[0]!="#":
x,y,z=map(float, line.split())
x=int(x)
y=int(y)
z=int(z)
max_index_temp=max(x,y)
if max_index_temp>max_index:
max_index=max_index_temp
max_index=max_index/resolution+1
M1=lil_matrix((max_index,max_index))
M2=lil_matrix((max_index,max_index))
with gzip.open(file1) as input_file:
for line in input_file:
if line[0]!="#":
x,y,z=map(float, line.split())
x=int(x)
y=int(y)
z=int(z)
M1[x/resolution,y/resolution]=z
M1[y/resolution,x/resolution]=z
with gzip.open(file2) as input_file:
for line in input_file:
if line[0]!="#":
x,y,z=map(float, line.split())
x=int(x)
y=int(y)
z=int(z)
M2[x/resolution,y/resolution]=z
M2[y/resolution,x/resolution]=z
return M1, M2
[docs]
def Parse_matrix_hic(file1, file2, chrn, resolution):
"""
Function Parse_matrix_hic.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
import straw #? Move import straw to here to mitigate possible problem
Table1=straw.straw("NONE",file1, chrn, chrn,"BP",resolution)
Table2=straw.straw("NONE",file2, chrn, chrn,"BP",resolution)
max_index=max(max(Table1[0]),max(Table1[1]),max(Table2[0]),max(Table2[1]))
max_index=max_index/resolution
M1=lil_matrix((max_index+1,max_index+1))
M2=lil_matrix((max_index+1,max_index+1))
for i in range(len(Table1[0])):
M1[Table1[0][i]/resolution,Table1[1][i]/resolution]=Table1[2][i]
M1[Table1[1][i]/resolution,Table1[0][i]/resolution]=Table1[2][i]
for i in range(len(Table2[0])):
M2[Table2[0][i]/resolution,Table2[1][i]/resolution]=Table2[2][i]
M2[Table2[1][i]/resolution,Table2[0][i]/resolution]=Table2[2][i]
return M1, M2
[docs]
def get_Laplacian(M):
"""
Function get_Laplacian.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
S=M.sum(1)
i_nz=numpy.where(S>0)[0]
S=S[i_nz]
M=(M[i_nz].T)[i_nz].T
S=1/numpy.sqrt(S)
M=S*M
M=(S*M.T).T
n=numpy.size(S)
M=numpy.identity(n)-M
M=(M+M.T)/2
return M
[docs]
def evec_distance(v1,v2):
"""
Function evec_distance.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
d1=numpy.dot(v1-v2,v1-v2)
d2=numpy.dot(v1+v2,v1+v2)
if d1<d2:
d=d1
else:
d=d2
return numpy.sqrt(d)
[docs]
def get_ipr(evec):
"""
Function get_ipr.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
ipr=1.0/(evec*evec*evec*evec).sum()
return ipr
[docs]
def get_reproducibility(
M1,
M2,
num_evec=20,
ipr_cut=5,
):
"""
Function get_reproducibility.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
k1=numpy.sign(M1.A).sum(1)
d1=numpy.diag(M1.A)
kd1=~((k1==1)*(d1>0))
k2=numpy.sign(M2.A).sum(1)
d2=numpy.diag(M2.A)
kd2=~((k2==1)*(d2>0))
iz=numpy.nonzero((k1+k2>0)*(kd1>0)*(kd2>0))[0]
M1b=(M1[iz].A.T)[iz].T
M2b=(M2[iz].A.T)[iz].T
i_nz1=numpy.where(M1b.sum(1)>0)[0]
i_nz2=numpy.where(M2b.sum(1)>0)[0]
i_z1=numpy.where(M1b.sum(1)==0)[0]
i_z2=numpy.where(M2b.sum(1)==0)[0]
M1b_L=get_Laplacian(M1b)
M2b_L=get_Laplacian(M2b)
# logger.debug(f"M1: {M1}")
# logger.debug(f"M2: {M2}")
logger.debug(f"M1b: {M1b}")
logger.debug(f"M2b: {M2b}")
logger.debug(f"M1b_L: {M1b_L}")
logger.debug(f"M2b_L: {M2b_L}")
a1, b1=eigsh(M1b_L,k=num_evec,which="SM")
a2, b2=eigsh(M2b_L,k=num_evec,which="SM")
b1_extend=numpy.zeros((numpy.size(M1b,0),num_evec))
b2_extend=numpy.zeros((numpy.size(M2b,0),num_evec))
for i in range(num_evec):
b1_extend[i_nz1,i]=b1[:,i]
b2_extend[i_nz2,i]=b2[:,i]
ipr1=numpy.zeros(num_evec)
ipr2=numpy.zeros(num_evec)
for i in range(num_evec):
ipr1[i]=get_ipr(b1_extend[:,i])
ipr2[i]=get_ipr(b2_extend[:,i])
b1_extend_eff=b1_extend[:,ipr1>ipr_cut]
b2_extend_eff=b2_extend[:,ipr2>ipr_cut]
num_evec_eff=min(numpy.size(b1_extend_eff,1),numpy.size(b2_extend_eff,1))
evd=numpy.zeros(num_evec_eff)
for i in range(num_evec_eff):
evd[i]=evec_distance(b1_extend_eff[:,i],b2_extend_eff[:,i])
Sd=evd.sum()
l=numpy.sqrt(2)
evs=abs(l-Sd/num_evec_eff)/l
logger.debug(f"num_evec_eff: {num_evec_eff}")
logger.debug(f"b1_extend_eff: {b1_extend_eff[:,:num_evec_eff]}")
logger.debug(f"b2_extend_eff: {b2_extend_eff[:,:num_evec_eff]}")
logger.debug(f"evd: {evd}")
logger.debug(f"Sd: {Sd}")
logger.debug(f"score: {evs}")
N=float(M1.shape[1])
if (numpy.sum(ipr1>N/100)<=1)|(numpy.sum(ipr2>N/100)<=1):
logger.info("at least one of the maps does not look like typical Hi-C maps")
else:
logger.info("size of maps: %d" %(numpy.size(M1,0)))
logger.info("reproducibility score: %6.3f " %(evs))
logger.info("num_evec_eff: %d" %(num_evec_eff))
return evs
[docs]
def main():
"""
Function main.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
num_evec=20
if len(sys.argv)==4 and sys.argv[1]=="-F":
M1, M2=Parse_matrix(sys.argv[2],sys.argv[3])
get_reproducibility(M1,M2,num_evec)
elif len(sys.argv)==6 and sys.argv[1]=="-f":
M1, M2=Parse_matrix_hic(sys.argv[2],sys.argv[3],sys.argv[4],int(sys.argv[5]))
get_reproducibility(M1,M2,num_evec)
elif len(sys.argv)==5 and sys.argv[1]=="t":
M1, M2=Parse_matrix_lieberman(sys.argv[2],sys.argv[3],int(sys.argv[4]))
get_reproducibility(M1,M2,num_evec)
else:
logger.info('3, 4 or 5 arguments required')
logger.info('To use matrix table files as the input:')
logger.info('python run_reproducibility.py -F matrix_file1 matrix_file2')
logger.info('To use .hic files as the input:')
logger.info('python run_reproducibility.py -f hic_file1 hic_file2 chrid resolution[int]')
logger.info('To use lieberman matrix files as the input:')
logger.info('python run_reproducibility.py t matrix_file1 matrix_file2 resolution[int]')
if __name__ == '__main__':
main()