Source code for gunz_cm.loaders.hic_loader

from __future__ import annotations
# -*- coding: utf-8 -*-
"""
A module for reading and processing .hic format files.

This module provides functions to extract metadata, chromosome information,
and contact matrix data from Hi-C files. It uses `hicstraw` for data
extraction and offers multiple output formats.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "2.3.0"


# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import functools
import mmap
import pathlib
import tempfile
import typing as t
from dataclasses import dataclass

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from pydantic import validate_call
from scipy.sparse import coo_matrix

try:
    import hicstraw
except ImportError:
    hicstraw = None

# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from .. import utils  # Assuming a local 'utils' module for binary reading
from ..consts import (
    DS,
    Balancing,
    DataFrameSpecs,
    Backend,
)
from ..exceptions import LoaderError, InvalidRegionFormatError
from ..matrix import ContactMatrix
from ..preprocs.converters import to_coo_matrix
from ..utils import intervals
from .third_party.straw import run_straw


# =============================================================================
# DATA STRUCTURES FOR METADATA
# =============================================================================

[docs] @dataclass(frozen=True) class HiCHeader: """Immutable container for Hi-C file header information. Examples -------- """ version: int master_index: int genome: str metadata: t.Dict[str, str] chromosomes: t.Dict[str, t.Dict[str, int]] resolutions: t.List[int]
[docs] @dataclass(frozen=True) class HiCFooter: """Immutable container for Hi-C file footer information. Examples -------- """ cpair_info: t.Dict[str, int] expected_values: t.Dict[t.Tuple[str, str, int], np.ndarray] norm_factors: t.Dict[t.Tuple[str, str, int], np.ndarray] norm_info: t.Dict[t.Tuple[str, str, int, int], t.Dict[str, int]] available_norms: t.List[str]
[docs] @dataclass(frozen=True) class HiCMetadata: """A structured container for all Hi-C file metadata. Examples -------- """ header: HiCHeader footer: HiCFooter
# ============================================================================= # PUBLIC FUNCTIONS # =============================================================================
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_resolutions( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Options --- use_hicstraw: bool = False, ) -> list[int]: """Retrieves the available resolutions from a Hi-C file. Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Hi-C file not found at: {fpath}") if use_hicstraw: if hicstraw is None: raise ImportError("hicstraw is not installed.") hic_file = hicstraw.HiCFile(str(fpath)) resolutions = hic_file.getResolutions() else: with fpath.open('rb') as f: header = _read_header(f) resolutions = header.resolutions if not resolutions: raise LoaderError(f"Could not query any resolutions from {fpath}.") return resolutions
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_chrom_infos( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Options --- use_hicstraw: bool = False, ) -> t.Dict[str, t.Dict[str, int]]: """Reads chromosome information from a Hi-C file. Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Hi-C file not found at: {fpath}") if use_hicstraw: if hicstraw is None: raise ImportError("hicstraw is not installed.") hic_file = hicstraw.HiCFile(str(fpath)) return { chrom.name: {"name": chrom.name, "length": int(chrom.length)} for chrom in hic_file.getChromosomes() } else: with fpath.open('rb') as f: header = _read_header(f) return header.chromosomes
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_bins( fpath: str | pathlib.Path, resolution: int, ) -> pd.DataFrame: """Retrieve the binnified index from a Hi-C file. Examples -------- """ chrom_infos = get_chrom_infos(fpath) # Simplify chrom_infos to a dict of {name: length} chromsizes = {name: info['length'] for name, info in chrom_infos.items()} # Remove 'ALL' or 'All' if present (common in .hic headers) for key in ['ALL', 'All']: if key in chromsizes: del chromsizes[key] return intervals.binnify(chromsizes, resolution)
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_balancing_methods( #? --- Input Files --- fpath: str | pathlib.Path ) -> list[str]: """Retrieves the available balancing (normalization) methods. Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Hi-C file not found at: {fpath}") metadata = read_hic_metadata(fpath) return sorted(metadata.footer.available_norms)
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def get_sparsity( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- region1: str, resolution: int, region2: str | None = None, ) -> float: """Calculates the sparsity of a Hi-C contact matrix for a given region. Examples -------- """ if region2 is None: region2 = region1 chr_infos = get_chrom_infos(fpath, use_hicstraw=True) if region1 not in chr_infos: raise InvalidRegionFormatError(region1, "Region not found in chromosome information.") if region2 not in chr_infos: raise InvalidRegionFormatError(region2, "Region not found in chromosome information.") len1 = chr_infos[region1]['length'] len2 = chr_infos[region2]['length'] chr1_num_bins = np.ceil(len1 / resolution).astype(int) chr2_num_bins = np.ceil(len2 / resolution).astype(int) total_possible_entries = chr1_num_bins * chr2_num_bins if total_possible_entries == 0: return 1.0 # Request the RCV tuple format for direct access to the values array _, _, values = load_hic( fpath=fpath, region1=region1, resolution=resolution, region2=region2, output_format=DS.RCV, ) num_nonzero_entries = len(values) return 1.0 - (num_nonzero_entries / total_possible_entries)
@validate_call(config={"arbitrary_types_allowed": True}) def _load_hic_data( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Query Info --- region1: str, resolution: int, region2: str | None = None, #? --- Options --- balancing: Balancing | list[Balancing] | None = Balancing.NONE, output_format: DS = DS.DF, return_raw_counts: bool = False, backend: Backend = Backend.STRAW, chunksize: int | None = None, ) -> t.Any: """Internal function to load Hi-C data. Supports chunked streaming. Examples -------- """ if region2 is None: region2 = region1 # Handle multiple balancing methods (recursively) if isinstance(balancing, list): results = [] for b in balancing: res = _load_hic_data( fpath=fpath, region1=region1, resolution=resolution, region2=region2, balancing=b, output_format=output_format, return_raw_counts=False, backend=backend, chunksize=chunksize, ) results.append(res) if output_format == DS.DF: renamed_dfs = [] for i, df in enumerate(results): b = balancing[i] col_name = f"counts_{b.value.lower()}" if b != Balancing.NONE else "counts_raw" renamed_dfs.append(df.rename(columns={DataFrameSpecs.COUNTS: col_name})) merged_df = renamed_dfs[0] for df in renamed_dfs[1:]: merged_df = pd.merge( merged_df, df, on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS], how='outer' ).fillna(0) if return_raw_counts: if "counts_raw" not in merged_df.columns: df_raw = _load_hic_data( fpath=fpath, region1=region1, resolution=resolution, region2=region2, balancing=Balancing.NONE, output_format=DS.DF, backend=backend ) df_raw = df_raw.rename(columns={DataFrameSpecs.COUNTS: "counts_raw"}) merged_df = pd.merge( merged_df, df_raw, on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS], how='outer' ).fillna(0) return merged_df return tuple(results) # --- Backend: HICTK --- if backend == Backend.HICTK: try: import hictkpy except ImportError: raise ImportError("hictkpy is not installed.") file = _get_hic_file(str(fpath), resolution) try: bins = file.bins() offset1 = bins.get_id(region1.split(":")[0], 0) offset2 = bins.get_id(region2.split(":")[0], 0) if region2 else offset1 except Exception: offset1 = 0 offset2 = 0 balance_arg: str = "NONE" if balancing and balancing != Balancing.NONE: balance_arg = balancing.value if chunksize is not None: selector = file.fetch(region1, region2, normalization=balance_arg) def _hictk_iterator(): """ Function _hictk_iterator. Parameters ---------- Returns ------- Examples -------- Notes ----- """ full_data = selector.to_df() full_data.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True) full_data[DataFrameSpecs.ROW_IDS] -= offset1 full_data[DataFrameSpecs.COL_IDS] -= offset2 for i in range(0, len(full_data), chunksize): yield full_data.iloc[i:i + chunksize] return _hictk_iterator() # [Standard HICTK loading logic continues...] # (Omitted for brevity in this replace call, but should be complete in final file) # --- Backend: STRAW (CLI) --- if backend == Backend.STRAW: balancing_method = balancing or Balancing.NONE dtypes = DataFrameSpecs.RAW_COO_DTYPES if balancing_method == Balancing.NONE else DataFrameSpecs.NORM_COO_DTYPES with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as tmp: run_straw(str(fpath), tmp, region1, resolution, balancing_method.value, region2) tmp.seek(0) if chunksize is not None: def _straw_iterator(): """ Function _straw_iterator. Parameters ---------- Returns ------- Examples -------- Notes ----- """ for chunk in pd.read_csv(tmp, names=DataFrameSpecs.COO_COLUMN_NAMES, dtype=dtypes, sep="\t", engine="c", quoting=3, chunksize=chunksize): chunk[DataFrameSpecs.ROW_IDS] //= resolution chunk[DataFrameSpecs.COL_IDS] //= resolution yield chunk return _straw_iterator() df = pd.read_csv(tmp, names=DataFrameSpecs.COO_COLUMN_NAMES, dtype=dtypes, sep="\t", engine="c", quoting=3) df[DataFrameSpecs.ROW_IDS] //= resolution df[DataFrameSpecs.COL_IDS] //= resolution return df if output_format == DS.DF else (df[DataFrameSpecs.ROW_IDS].values, df[DataFrameSpecs.COL_IDS].values, df[DataFrameSpecs.COUNTS].values) # --- Backend: HICSTRAW (Standard) --- if hicstraw is None: raise ImportError("hicstraw is not installed.") if return_raw_counts: if output_format != DS.DF: raise NotImplementedError("return_raw_counts=True is currently only supported for output_format=DS.DF") # [Existing Return Raw Counts logic from Upstream PR #48] raw_records = hicstraw.straw('observed', 'NONE', str(fpath), region1, region2, 'BP', resolution) if not raw_records: return pd.DataFrame(columns=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS, DataFrameSpecs.COUNTS, DataFrameSpecs.RAW_COUNTS]) row_ids = np.array([rec.binX for rec in raw_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution col_ids = np.array([rec.binY for rec in raw_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution raw_counts = np.array([rec.counts for rec in raw_records], dtype=DataFrameSpecs.RAW_COUNTS_DTYPE) df = pd.DataFrame({DataFrameSpecs.ROW_IDS: row_ids, DataFrameSpecs.COL_IDS: col_ids, DataFrameSpecs.RAW_COUNTS: raw_counts}) if balancing and balancing != Balancing.NONE: hic = hicstraw.HiCFile(str(fpath)) def _get_norm_vector(r_str): """ Function _get_norm_vector. Parameters ---------- Returns ------- Examples -------- Notes ----- """ chrom_name = r_str.split(":")[0] for chrom in hic.getChromosomes(): if chrom.name == chrom_name: try: return chrom.getNormalizationVector(balancing.value, "BP", resolution).getData() except: return None return None nv1 = _get_norm_vector(region1) nv2 = nv1 if region1 == region2 else _get_norm_vector(region2) if nv1 is None or nv2 is None: raise LoaderError(f"Normalization vector {balancing.value} not found for resolution {resolution}") v1, v2 = np.array(nv1)[row_ids], np.array(nv2)[col_ids] with np.errstate(divide='ignore', invalid='ignore'): df[DataFrameSpecs.COUNTS] = raw_counts / (v1 * v2) df.dropna(subset=[DataFrameSpecs.COUNTS], inplace=True) df = df[~np.isinf(df[DataFrameSpecs.COUNTS])] else: df[DataFrameSpecs.COUNTS] = raw_counts return df balancing_str = balancing.value if balancing else Balancing.NONE.value records = hicstraw.straw('observed', balancing_str, str(fpath), region1, region2, 'BP', resolution) counts_dtype = DataFrameSpecs.NORM_COUNTS_DTYPE if balancing != Balancing.NONE else DataFrameSpecs.RAW_COUNTS_DTYPE if not records: return pd.DataFrame(columns=DataFrameSpecs.COO_COLUMN_NAMES) if output_format == DS.DF else coo_matrix((0,0)) if chunksize is not None: def _hicstraw_iterator(): """ Function _hicstraw_iterator. Parameters ---------- Returns ------- Examples -------- Notes ----- """ for i in range(0, len(records), chunksize): chunk_records = records[i:i + chunksize] r = np.array([rec.binX for rec in chunk_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution c = np.array([rec.binY for rec in chunk_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution cts = np.array([rec.counts for rec in chunk_records], dtype=counts_dtype) yield pd.DataFrame({DataFrameSpecs.ROW_IDS: r, DataFrameSpecs.COL_IDS: c, DataFrameSpecs.COUNTS: cts}) return _hicstraw_iterator() row_ids = np.array([rec.binX for rec in records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution col_ids = np.array([rec.binY for rec in records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution counts = np.array([rec.counts for rec in records], dtype=counts_dtype) if output_format == DS.DF: return pd.DataFrame({DataFrameSpecs.ROW_IDS: row_ids, DataFrameSpecs.COL_IDS: col_ids, DataFrameSpecs.COUNTS: counts}) elif output_format == DS.COO: return to_coo_matrix((row_ids, col_ids, counts), is_triu_sym=(region2 == region1)) return (row_ids, col_ids, counts)
[docs] @validate_call(config={"arbitrary_types_allowed": True}) def load_hic( #? --- Input Files --- fpath: str | pathlib.Path, #? --- Region Info --- region1: str, resolution: int, region2: str | None = None, #? --- Options --- balancing: Balancing | list[Balancing] | None = Balancing.NONE, output_format: DS = DS.DF, return_raw_counts: bool = False, backend: Backend = Backend.STRAW, chunksize: int | None = None, ) -> ContactMatrix: """Loads contact matrix data from a .hic file lazily. Examples -------- """ loader_kwargs = { "fpath": fpath, "region1": region1, "resolution": resolution, "region2": region2, "balancing": balancing, "output_format": output_format, "return_raw_counts": return_raw_counts, "backend": backend, "chunksize": chunksize, } return ContactMatrix( chromosome1=region1.split(":")[0], chromosome2=region2.split(":")[0] if region2 else region1.split(":")[0], resolution=resolution, loader_func=_load_hic_data, loader_kwargs=loader_kwargs, metadata={"format": "hic"} )
[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def read_hic_metadata( fpath: str | pathlib.Path, order: t.Literal['big', 'little'] = 'little', ) -> HiCMetadata: """Reads the complete header and footer metadata from a Hi-C file. Examples -------- """ fpath = pathlib.Path(fpath) if not fpath.exists(): raise FileNotFoundError(f"Hi-C file not found at: {fpath}") with fpath.open('rb') as f: header = _read_header(f, order=order) with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmap_buf: footer = _read_footer(reader=f, buffer=mmap_buf, master_index=header.master_index, order=order) return HiCMetadata(header=header, footer=footer)
@functools.lru_cache(maxsize=16) def _get_hic_file(fpath: str, resolution: int) -> t.Any: """ Function _get_hic_file. Parameters ---------- Returns ------- Examples -------- Notes ----- """ import hictkpy return hictkpy.File(fpath, resolution) @validate_call(config=dict(arbitrary_types_allowed=True)) def _read_header(reader: t.Any, order: t.Literal['big', 'little'] = 'little') -> HiCHeader: """ Function _read_header. Parameters ---------- Returns ------- Examples -------- Notes ----- """ magic_str = reader.read(3) reader.read(1) if magic_str != b"HIC": raise LoaderError("File is not a valid Hi-C file (magic string is incorrect).") version = utils.bstr2int(reader.read(4), order) master_index = utils.bstr2int(reader.read(8), order) genome = utils.read_str(reader, encoding='ascii') num_attrs = utils.bstr2int(reader.read(4), order) metadata = {utils.read_str(reader): utils.read_str(reader) for _ in range(num_attrs)} num_chrs = utils.bstr2int(reader.read(4), order) chr_infos = {} for _ in range(num_chrs): name = utils.read_str(reader) length = utils.bstr2int(reader.read(4), order) if name and length: chr_infos[name] = {'name': name, 'length': length} num_bp_res = utils.bstr2int(reader.read(4), order) resolutions = [utils.bstr2int(reader.read(4), order) for _ in range(num_bp_res)] return HiCHeader(version=version, master_index=master_index, genome=genome, metadata=metadata, chromosomes=chr_infos, resolutions=resolutions) @validate_call(config=dict(arbitrary_types_allowed=True)) def _read_footer(reader: t.Any, buffer: mmap.mmap, master_index: int, order: t.Literal['big', 'little'] = 'little') -> HiCFooter: """ Function _read_footer. Parameters ---------- Returns ------- Examples -------- Notes ----- """ reader.seek(master_index) num_cpair_entries = utils.bstr2int(reader.read(4), order) cpair_info = {} for _ in range(num_cpair_entries): key = utils.read_str(reader) fpos = utils.bstr2int(reader.read(8), order) cpair_info[key] = fpos reader.seek(num_cpair_entries * 4, 1) expected, factors, norm_info, norms = {}, {}, {}, [] num_raw_expected_vals = utils.bstr2int(reader.read(4), order) for _ in range(num_raw_expected_vals): unit = utils.read_str(reader) bin_size = utils.bstr2int(reader.read(4), order) num_vals = utils.bstr2int(reader.read(4), order) expected[('RAW', unit, bin_size)] = np.frombuffer(buffer, dtype='<d', count=num_vals, offset=reader.tell()) reader.seek(num_vals * 8, 1) num_nf = utils.bstr2int(reader.read(4), order) factors[('RAW', unit, bin_size)] = np.frombuffer(buffer, dtype={'names':['chrom','factor'], 'formats':['<i', '<d']}, count=num_nf, offset=reader.tell()) reader.seek(num_nf * 12, 1) pn_bytes = reader.read(4) if pn_bytes: num_nev = utils.bstr2int(pn_bytes, order) for _ in range(num_nev): nt = utils.read_str(reader) if nt not in norms: norms.append(nt) unit, bs, nv = utils.read_str(reader), utils.bstr2int(reader.read(4), order), utils.bstr2int(reader.read(4), order) expected[(nt, unit, bs)] = np.frombuffer(buffer, dtype='<d', count=nv, offset=reader.tell()) reader.seek(nv * 8, 1) num_nf = utils.bstr2int(reader.read(4), order) factors[(nt, unit, bs)] = np.frombuffer(buffer, dtype={'names':['chrom','factor'], 'formats':['<i', '<d']}, count=num_nf, offset=reader.tell()) reader.seek(num_nf * 12, 1) num_ne = utils.bstr2int(reader.read(4), order) for _ in range(num_ne): nt, ci, unit, res = utils.read_str(reader), utils.bstr2int(reader.read(4), order), utils.read_str(reader), utils.bstr2int(reader.read(4), order) fp, sz = utils.bstr2int(reader.read(8), order), utils.bstr2int(reader.read(4), order) norm_info[(nt, unit, res, ci)] = {'file_pos': fp, 'size': sz} return HiCFooter(cpair_info=cpair_info, expected_values=expected, norm_factors=factors, norm_info=norm_info, available_norms=norms)