"""
Module.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import re
import operator as op
import typing as t
from itertools import combinations
import numpy as np
import scipy.sparse as sp
from ... import consts as cm_consts
from .downweighting import Downweighting
#? Regular expression to parse chromosome positions
chr_pos_re = re.compile(
r'(?:chr)?(\w+):(\d+)(?:-(\d+))?',
re.IGNORECASE
)
[docs]
def create_sprite_cm(
clusters_fpath: str,
chrom: str,
build: str = "hg19",
resolution: int = 1_000_000,
min_cluster_size: int = 2,
max_cluster_size: int = 1_000,
min_nway_interactions: t.Optional[int] = None,
max_nway_interactions: t.Optional[int] = None,
downweighting: t.Optional[str] = None,
cache_file: bool = False,
) -> sp.coo_matrix:
"""
Creates a SPRITE contact matrix from a file containing genomic clusters.
Notes
-----
- Currently only supports intra-chromosomal contact.
- The function reads a file containing genomic clusters and constructs a sparse contact matrix.
- The contact matrix is built based on the specified resolution and downweighting method.
- Downweighting methods include:
- `NONE`: No downweighting. Each contact has a value of 1.
- `N_MINUS_ONE`: A contact from a cluster of n reads has a value of 1 / (n - 1).
- `TWO_OVER_N`: A contact from a cluster of n reads has a value of 2 / n.
- `UNKNOWN`: A default downweighting scheme for error checking.
- Supported genome builds are:
- `hg19`: Human genome build 19 or GrCh37.
- `hg38`: Human genome build 38 or GrCh38.
- `mm9`: Mouse genome build 9.
- `mm10`: Mouse genome build 10.
Parameters
----------
clusters_fpath : str
The file path to the clusters file.
chrom : str
The chromosome to process.
build : str, optional
The genome build (default is "hg19").
resolution : int, optional
The resolution of the contact matrix in base pairs (default is 1,000,000).
min_cluster_size : int, optional
The minimum size of a cluster to be considered (default is 2).
max_cluster_size : int, optional
The maximum size of a cluster to be considered (default is 1,000).
min_nway_interactions : Optional[int], optional
The minimum number of n-way interactions to consider (default is None).
max_nway_interactions : Optional[int], optional
The maximum number of n-way interactions to consider (default is None).
downweighting : Optional[str], optional
The downweighting method to use (default is None).
cache_file : bool, optional
Whether to cache the cluster file content in memory (default is False).
Returns
-------
sp.coo_matrix
A sparse contact matrix in COO format.
Examples
--------
Authors
-------
- Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de)
- Qwen2.5 72B - 4.25bpw
Examples
--------
"""
#? Initialize the assembly object based on the genome build and resolution
downweighting = Downweighting.from_str(downweighting)
#? Get the length of the chromosome and calculate the number of bins
chrom_lengths = cm_consts.AVAIL_GENOME_BUILDS[build]
chrom_length = chrom_lengths[f"chr{chrom}"]
num_bins = np.ceil(chrom_length / resolution).astype(int)
cm = sp.coo_matrix((num_bins, num_bins), dtype=np.float32)
#? Function to extract groups from regex matches
groupsf = op.methodcaller("groups")
#? Open the clusters file and process each line
with open(clusters_fpath, 'r') as fd:
if cache_file:
lines = fd.read().split("\n")
else:
lines = fd
for line in lines:
#? Parse chromosome positions from the line
#? Elements are: ['chrX', <start_pos>, <end_pos>]
chr_pos_np = np.array(
list(
map(groupsf, chr_pos_re.finditer(line))
)
)
num_cluster = chr_pos_np.shape[0]
#? Filter by cluster size
if not min_cluster_size <= num_cluster <= max_cluster_size:
continue
#? FIlter by chromosome
mask = chr_pos_np[:, 0] == chrom
if not mask.any():
continue
#? Convert parsed positions to bin indices
poss = chr_pos_np[mask, 1].astype(int)
bin_ids = np.unique(poss//resolution)
#? Skip clusters with fewer than 2 unique bin interactions
nway_interactions = len(bin_ids)
if nway_interactions <= 1:
continue
#? Filter by nway_interactions
if min_nway_interactions is not None and min_nway_interactions > nway_interactions:
continue
if max_nway_interactions is not None and nway_interactions > max_nway_interactions :
continue
#? Estimate the number of contacts based on the downweighting method
num_contacts = downweighting.estimate_num_contacts(
bin_ids
)
#? Generate all pairwise combinations of bin indices
rc_ids = np.array(list(combinations(bin_ids, 2)))
num_rc_ids = rc_ids.shape[0]
#? Create a temporary COO matrix for the current cluster
tmp_vals = np.full(num_rc_ids, num_contacts)
tmp_row_ids = rc_ids[:, 0]
tmp_col_ids = rc_ids[:, 1]
tmp_coo = sp.coo_matrix(
(tmp_vals, (tmp_row_ids, tmp_col_ids)),
shape=cm.shape
)
#? Add the temporary COO matrix to the main contact matrix
#? This makes cm becomes CSR
cm += tmp_coo
#? Convert the final contact matrix back to COO format
cm_coo = cm.tocoo()
return cm_coo