Source code for gunz_cm.pipeline.sprite.sprite_cm

"""
Module.

Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import re
import operator as op
import typing as t
from itertools import combinations
import numpy as np
import scipy.sparse as sp

from ... import consts as cm_consts
from .downweighting import Downweighting

#? Regular expression to parse chromosome positions
chr_pos_re = re.compile(
    r'(?:chr)?(\w+):(\d+)(?:-(\d+))?', 
    re.IGNORECASE
)

[docs] def create_sprite_cm( clusters_fpath: str, chrom: str, build: str = "hg19", resolution: int = 1_000_000, min_cluster_size: int = 2, max_cluster_size: int = 1_000, min_nway_interactions: t.Optional[int] = None, max_nway_interactions: t.Optional[int] = None, downweighting: t.Optional[str] = None, cache_file: bool = False, ) -> sp.coo_matrix: """ Creates a SPRITE contact matrix from a file containing genomic clusters. Notes ----- - Currently only supports intra-chromosomal contact. - The function reads a file containing genomic clusters and constructs a sparse contact matrix. - The contact matrix is built based on the specified resolution and downweighting method. - Downweighting methods include: - `NONE`: No downweighting. Each contact has a value of 1. - `N_MINUS_ONE`: A contact from a cluster of n reads has a value of 1 / (n - 1). - `TWO_OVER_N`: A contact from a cluster of n reads has a value of 2 / n. - `UNKNOWN`: A default downweighting scheme for error checking. - Supported genome builds are: - `hg19`: Human genome build 19 or GrCh37. - `hg38`: Human genome build 38 or GrCh38. - `mm9`: Mouse genome build 9. - `mm10`: Mouse genome build 10. Parameters ---------- clusters_fpath : str The file path to the clusters file. chrom : str The chromosome to process. build : str, optional The genome build (default is "hg19"). resolution : int, optional The resolution of the contact matrix in base pairs (default is 1,000,000). min_cluster_size : int, optional The minimum size of a cluster to be considered (default is 2). max_cluster_size : int, optional The maximum size of a cluster to be considered (default is 1,000). min_nway_interactions : Optional[int], optional The minimum number of n-way interactions to consider (default is None). max_nway_interactions : Optional[int], optional The maximum number of n-way interactions to consider (default is None). downweighting : Optional[str], optional The downweighting method to use (default is None). cache_file : bool, optional Whether to cache the cluster file content in memory (default is False). Returns ------- sp.coo_matrix A sparse contact matrix in COO format. Examples -------- Authors ------- - Yeremia G. Adhisantoso (adhisant@tnt.uni-hannover.de) - Qwen2.5 72B - 4.25bpw Examples -------- """ #? Initialize the assembly object based on the genome build and resolution downweighting = Downweighting.from_str(downweighting) #? Get the length of the chromosome and calculate the number of bins chrom_lengths = cm_consts.AVAIL_GENOME_BUILDS[build] chrom_length = chrom_lengths[f"chr{chrom}"] num_bins = np.ceil(chrom_length / resolution).astype(int) cm = sp.coo_matrix((num_bins, num_bins), dtype=np.float32) #? Function to extract groups from regex matches groupsf = op.methodcaller("groups") #? Open the clusters file and process each line with open(clusters_fpath, 'r') as fd: if cache_file: lines = fd.read().split("\n") else: lines = fd for line in lines: #? Parse chromosome positions from the line #? Elements are: ['chrX', <start_pos>, <end_pos>] chr_pos_np = np.array( list( map(groupsf, chr_pos_re.finditer(line)) ) ) num_cluster = chr_pos_np.shape[0] #? Filter by cluster size if not min_cluster_size <= num_cluster <= max_cluster_size: continue #? FIlter by chromosome mask = chr_pos_np[:, 0] == chrom if not mask.any(): continue #? Convert parsed positions to bin indices poss = chr_pos_np[mask, 1].astype(int) bin_ids = np.unique(poss//resolution) #? Skip clusters with fewer than 2 unique bin interactions nway_interactions = len(bin_ids) if nway_interactions <= 1: continue #? Filter by nway_interactions if min_nway_interactions is not None and min_nway_interactions > nway_interactions: continue if max_nway_interactions is not None and nway_interactions > max_nway_interactions : continue #? Estimate the number of contacts based on the downweighting method num_contacts = downweighting.estimate_num_contacts( bin_ids ) #? Generate all pairwise combinations of bin indices rc_ids = np.array(list(combinations(bin_ids, 2))) num_rc_ids = rc_ids.shape[0] #? Create a temporary COO matrix for the current cluster tmp_vals = np.full(num_rc_ids, num_contacts) tmp_row_ids = rc_ids[:, 0] tmp_col_ids = rc_ids[:, 1] tmp_coo = sp.coo_matrix( (tmp_vals, (tmp_row_ids, tmp_col_ids)), shape=cm.shape ) #? Add the temporary COO matrix to the main contact matrix #? This makes cm becomes CSR cm += tmp_coo #? Convert the final contact matrix back to COO format cm_coo = cm.tocoo() return cm_coo