Source code for gunz_cm.loaders.utils

from __future__ import annotations
# -*- coding: utf-8 -*-
"""
Module.

Examples
--------
"""
__version__ = "1.0.0"
from pydantic import validate_call

__author__ = "Yeremia Gunawan Adhisantoso"
__credits__ = ["Yeremia Gunawan Adhisantoso"]
__license__ = "Clear BSD"
# __version__ = "1.0."
__maintainer__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
# __status__ = "Production"


from collections import namedtuple
import typing as t
import pandas as pd
from gunz_cm.consts import *


[docs] class Constant: """Bind instance to name to get a unique object Examples -------- """ pass
ClosedInterval = namedtuple("ClosedInterval", ["start", "end"])
[docs] class Region: """Represent a range of loci and interface with the textual UCSC style in the form 'chr22:1,000,000-1,500,000' Use static method Region.from_string to parse USCS string. Converting back to string will canonicalize Attributes ---------- chromosome : int or str 1-22 or string X/Y/M (use chromname() to get chrN string) region: ClosedInterval or the constant Region.ALL_LOCI For 'chr1:100-500', region.start == 100 and region.end == 500 Examples -------- Parsing tries to be more lenient than the canonical form requires. >>> str(Region.from_string('1:1,000-1,500')) == 'chr1:1000-1500' True >>> str(Region.from_string('chry')) == 'chrY' True Examples -------- """ ALL_LOCI = Constant() def __init__( self, chromosome: t.Union[int, str], # Number or X, Y region: t.Union[t.Tuple[int, int], Constant], ): """ Function __init__. Parameters ---------- Returns ------- Examples -------- Notes ----- """ self.chromosome = chromosome if region is Region.ALL_LOCI: self.region = region else: self.region = ClosedInterval(*region)
[docs] def is_full_chrom(self): """This region describes the full chromosome, so region is 0:N Examples -------- """ return self.region is Region.ALL_LOCI
[docs] def chromname(self) -> str: """ Function chromname. Parameters ---------- Returns ------- Examples -------- Notes ----- """ return f"chr{self.chromosome}"
[docs] @staticmethod def from_string( region, ) -> Region: """ Function from_string. Parameters ---------- Returns ------- Examples -------- Notes ----- """ chrom, *rest = region.lstrip("chr").split(":") if len(rest) == 0: interval = Region.ALL_LOCI elif len(rest) == 1: try: interval = ClosedInterval(*map(int, rest[0].replace(",", "").split("-"))) except: raise LoaderError( "Invalid region format: Must match template 'chr<c>:<start>-<end>' or 'chr<c>'\n" ) if interval.start > interval.end: raise LoaderError("Start must be smaller than end") else: raise LoaderError( "Invalid region format: Must match template 'chr<c>:<start>-<end>' or 'chr<c>'\n" ) return Region(chrom, interval)
def __str__(self): """ Function __str__. Parameters ---------- Returns ------- Examples -------- Notes ----- """ if self.is_full_chrom(): return self.chromname() else: return f"{self.chromname()}:{self.region.start}-{self.region.end}"
@validate_call(config=dict(arbitrary_types_allowed=True)) def _generate_region_mask( df: pd.DataFrame, region: t.Union[Region, str], resolution: int ) -> pd.Series: """ Generate a pandas boolean mask that would filter out everything besides the specified region. Pure function. Params: df: Read only dataframe that the mask will be created for region: string of UCSC format f"{int(locus_start)}-{int(locus_end)}" Returns: Boolean mask s.t. df[mask] only contains the loci specified in region. Examples -------- """ if isinstance(region, str): region = Region.from_string(region) # Pattern matching will make this cleaner. Edit this once 3.10 is the minimum supported version if region.is_full_chrom(): start = 1 end = None else: start, end = region.region # https://genome.ucsc.edu/goldenPath/help/query.html # UCSC indexes start at 1, but our python arrays are 0-indexed. Thus, subtract 1 start_idx = max(0, start - 1) mask = df[ROW_IDS_COLNAME] >= start_idx mask &= df[COL_IDS_COLNAME] >= start_idx if end is not None: # UCSC ranges are inclusive, but need to offset 1 due to python 0-indexing mask &= df[ROW_IDS_COLNAME] <= (end - 1) mask &= df[COL_IDS_COLNAME] <= (end - 1) return mask def _get_fileext_without_compression( fname: str, ) -> str: """ Find the fileextension before the compression extension. Case insensitive. Parameters ---------- fname : str Path/ name of a file Returns ------- str The real format of the file. Examples -------- >>> _get_fileext_without_compression("path/to/file.txt.tar.gz") 'txt' Examples -------- """ return __get_fileext_without_compression_impl(fname.split(".")) def __get_fileext_without_compression_impl( parts: t.List[str], ) -> str: """ Function __get_fileext_without_compression_impl. Parameters ---------- Returns ------- Examples -------- Notes ----- """ # Helper so caller doesn't have to split the string needlessly. ext = parts[-1].lower() if ext in SUPPORTED_COMPRESSION_SCHEMES: return __get_fileext_without_compression_impl(parts[:-1]) return ext