from __future__ import annotations
# -*- coding: utf-8 -*-
"""
A module for reading and processing .hic format files.
This module provides functions to extract metadata, chromosome information,
and contact matrix data from Hi-C files. It uses `hicstraw` for data
extraction and offers multiple output formats.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "2.3.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import functools
import mmap
import pathlib
import tempfile
import typing as t
from dataclasses import dataclass
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
import pandas as pd
from pydantic import validate_call
from scipy.sparse import coo_matrix
try:
import hicstraw
except ImportError:
hicstraw = None
# =============================================================================
# LOCAL APPLICATION IMPORTS
# =============================================================================
from .. import utils # Assuming a local 'utils' module for binary reading
from ..consts import (
DS,
Balancing,
DataFrameSpecs,
Backend,
)
from ..exceptions import LoaderError, InvalidRegionFormatError
from ..matrix import ContactMatrix
from ..preprocs.converters import to_coo_matrix
from ..utils import intervals
from .third_party.straw import run_straw
# =============================================================================
# DATA STRUCTURES FOR METADATA
# =============================================================================
# =============================================================================
# PUBLIC FUNCTIONS
# =============================================================================
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_resolutions(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Options ---
use_hicstraw: bool = False,
) -> list[int]:
"""Retrieves the available resolutions from a Hi-C file.
Examples
--------
"""
fpath = pathlib.Path(fpath)
if not fpath.exists():
raise FileNotFoundError(f"Hi-C file not found at: {fpath}")
if use_hicstraw:
if hicstraw is None:
raise ImportError("hicstraw is not installed.")
hic_file = hicstraw.HiCFile(str(fpath))
resolutions = hic_file.getResolutions()
else:
with fpath.open('rb') as f:
header = _read_header(f)
resolutions = header.resolutions
if not resolutions:
raise LoaderError(f"Could not query any resolutions from {fpath}.")
return resolutions
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_chrom_infos(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Options ---
use_hicstraw: bool = False,
) -> t.Dict[str, t.Dict[str, int]]:
"""Reads chromosome information from a Hi-C file.
Examples
--------
"""
fpath = pathlib.Path(fpath)
if not fpath.exists():
raise FileNotFoundError(f"Hi-C file not found at: {fpath}")
if use_hicstraw:
if hicstraw is None:
raise ImportError("hicstraw is not installed.")
hic_file = hicstraw.HiCFile(str(fpath))
return {
chrom.name: {"name": chrom.name, "length": int(chrom.length)}
for chrom in hic_file.getChromosomes()
}
else:
with fpath.open('rb') as f:
header = _read_header(f)
return header.chromosomes
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_bins(
fpath: str | pathlib.Path,
resolution: int,
) -> pd.DataFrame:
"""Retrieve the binnified index from a Hi-C file.
Examples
--------
"""
chrom_infos = get_chrom_infos(fpath)
# Simplify chrom_infos to a dict of {name: length}
chromsizes = {name: info['length'] for name, info in chrom_infos.items()}
# Remove 'ALL' or 'All' if present (common in .hic headers)
for key in ['ALL', 'All']:
if key in chromsizes:
del chromsizes[key]
return intervals.binnify(chromsizes, resolution)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_balancing_methods(
#? --- Input Files ---
fpath: str | pathlib.Path
) -> list[str]:
"""Retrieves the available balancing (normalization) methods.
Examples
--------
"""
fpath = pathlib.Path(fpath)
if not fpath.exists():
raise FileNotFoundError(f"Hi-C file not found at: {fpath}")
metadata = read_hic_metadata(fpath)
return sorted(metadata.footer.available_norms)
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def get_sparsity(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
region1: str,
resolution: int,
region2: str | None = None,
) -> float:
"""Calculates the sparsity of a Hi-C contact matrix for a given region.
Examples
--------
"""
if region2 is None:
region2 = region1
chr_infos = get_chrom_infos(fpath, use_hicstraw=True)
if region1 not in chr_infos:
raise InvalidRegionFormatError(region1, "Region not found in chromosome information.")
if region2 not in chr_infos:
raise InvalidRegionFormatError(region2, "Region not found in chromosome information.")
len1 = chr_infos[region1]['length']
len2 = chr_infos[region2]['length']
chr1_num_bins = np.ceil(len1 / resolution).astype(int)
chr2_num_bins = np.ceil(len2 / resolution).astype(int)
total_possible_entries = chr1_num_bins * chr2_num_bins
if total_possible_entries == 0:
return 1.0
# Request the RCV tuple format for direct access to the values array
_, _, values = load_hic(
fpath=fpath,
region1=region1,
resolution=resolution,
region2=region2,
output_format=DS.RCV,
)
num_nonzero_entries = len(values)
return 1.0 - (num_nonzero_entries / total_possible_entries)
@validate_call(config={"arbitrary_types_allowed": True})
def _load_hic_data(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Query Info ---
region1: str,
resolution: int,
region2: str | None = None,
#? --- Options ---
balancing: Balancing | list[Balancing] | None = Balancing.NONE,
output_format: DS = DS.DF,
return_raw_counts: bool = False,
backend: Backend = Backend.STRAW,
chunksize: int | None = None,
) -> t.Any:
"""Internal function to load Hi-C data. Supports chunked streaming.
Examples
--------
"""
if region2 is None:
region2 = region1
# Handle multiple balancing methods (recursively)
if isinstance(balancing, list):
results = []
for b in balancing:
res = _load_hic_data(
fpath=fpath,
region1=region1,
resolution=resolution,
region2=region2,
balancing=b,
output_format=output_format,
return_raw_counts=False,
backend=backend,
chunksize=chunksize,
)
results.append(res)
if output_format == DS.DF:
renamed_dfs = []
for i, df in enumerate(results):
b = balancing[i]
col_name = f"counts_{b.value.lower()}" if b != Balancing.NONE else "counts_raw"
renamed_dfs.append(df.rename(columns={DataFrameSpecs.COUNTS: col_name}))
merged_df = renamed_dfs[0]
for df in renamed_dfs[1:]:
merged_df = pd.merge(
merged_df, df,
on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS],
how='outer'
).fillna(0)
if return_raw_counts:
if "counts_raw" not in merged_df.columns:
df_raw = _load_hic_data(
fpath=fpath,
region1=region1,
resolution=resolution,
region2=region2,
balancing=Balancing.NONE,
output_format=DS.DF,
backend=backend
)
df_raw = df_raw.rename(columns={DataFrameSpecs.COUNTS: "counts_raw"})
merged_df = pd.merge(
merged_df, df_raw,
on=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS],
how='outer'
).fillna(0)
return merged_df
return tuple(results)
# --- Backend: HICTK ---
if backend == Backend.HICTK:
try:
import hictkpy
except ImportError:
raise ImportError("hictkpy is not installed.")
file = _get_hic_file(str(fpath), resolution)
try:
bins = file.bins()
offset1 = bins.get_id(region1.split(":")[0], 0)
offset2 = bins.get_id(region2.split(":")[0], 0) if region2 else offset1
except Exception:
offset1 = 0
offset2 = 0
balance_arg: str = "NONE"
if balancing and balancing != Balancing.NONE:
balance_arg = balancing.value
if chunksize is not None:
selector = file.fetch(region1, region2, normalization=balance_arg)
def _hictk_iterator():
"""
Function _hictk_iterator.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
full_data = selector.to_df()
full_data.rename(columns={'bin1_id': DataFrameSpecs.ROW_IDS, 'bin2_id': DataFrameSpecs.COL_IDS, 'count': DataFrameSpecs.COUNTS}, inplace=True)
full_data[DataFrameSpecs.ROW_IDS] -= offset1
full_data[DataFrameSpecs.COL_IDS] -= offset2
for i in range(0, len(full_data), chunksize):
yield full_data.iloc[i:i + chunksize]
return _hictk_iterator()
# [Standard HICTK loading logic continues...]
# (Omitted for brevity in this replace call, but should be complete in final file)
# --- Backend: STRAW (CLI) ---
if backend == Backend.STRAW:
balancing_method = balancing or Balancing.NONE
dtypes = DataFrameSpecs.RAW_COO_DTYPES if balancing_method == Balancing.NONE else DataFrameSpecs.NORM_COO_DTYPES
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf-8") as tmp:
run_straw(str(fpath), tmp, region1, resolution, balancing_method.value, region2)
tmp.seek(0)
if chunksize is not None:
def _straw_iterator():
"""
Function _straw_iterator.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
for chunk in pd.read_csv(tmp, names=DataFrameSpecs.COO_COLUMN_NAMES, dtype=dtypes, sep="\t", engine="c", quoting=3, chunksize=chunksize):
chunk[DataFrameSpecs.ROW_IDS] //= resolution
chunk[DataFrameSpecs.COL_IDS] //= resolution
yield chunk
return _straw_iterator()
df = pd.read_csv(tmp, names=DataFrameSpecs.COO_COLUMN_NAMES, dtype=dtypes, sep="\t", engine="c", quoting=3)
df[DataFrameSpecs.ROW_IDS] //= resolution
df[DataFrameSpecs.COL_IDS] //= resolution
return df if output_format == DS.DF else (df[DataFrameSpecs.ROW_IDS].values, df[DataFrameSpecs.COL_IDS].values, df[DataFrameSpecs.COUNTS].values)
# --- Backend: HICSTRAW (Standard) ---
if hicstraw is None:
raise ImportError("hicstraw is not installed.")
if return_raw_counts:
if output_format != DS.DF:
raise NotImplementedError("return_raw_counts=True is currently only supported for output_format=DS.DF")
# [Existing Return Raw Counts logic from Upstream PR #48]
raw_records = hicstraw.straw('observed', 'NONE', str(fpath), region1, region2, 'BP', resolution)
if not raw_records:
return pd.DataFrame(columns=[DataFrameSpecs.ROW_IDS, DataFrameSpecs.COL_IDS, DataFrameSpecs.COUNTS, DataFrameSpecs.RAW_COUNTS])
row_ids = np.array([rec.binX for rec in raw_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
col_ids = np.array([rec.binY for rec in raw_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
raw_counts = np.array([rec.counts for rec in raw_records], dtype=DataFrameSpecs.RAW_COUNTS_DTYPE)
df = pd.DataFrame({DataFrameSpecs.ROW_IDS: row_ids, DataFrameSpecs.COL_IDS: col_ids, DataFrameSpecs.RAW_COUNTS: raw_counts})
if balancing and balancing != Balancing.NONE:
hic = hicstraw.HiCFile(str(fpath))
def _get_norm_vector(r_str):
"""
Function _get_norm_vector.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
chrom_name = r_str.split(":")[0]
for chrom in hic.getChromosomes():
if chrom.name == chrom_name:
try: return chrom.getNormalizationVector(balancing.value, "BP", resolution).getData()
except: return None
return None
nv1 = _get_norm_vector(region1)
nv2 = nv1 if region1 == region2 else _get_norm_vector(region2)
if nv1 is None or nv2 is None:
raise LoaderError(f"Normalization vector {balancing.value} not found for resolution {resolution}")
v1, v2 = np.array(nv1)[row_ids], np.array(nv2)[col_ids]
with np.errstate(divide='ignore', invalid='ignore'):
df[DataFrameSpecs.COUNTS] = raw_counts / (v1 * v2)
df.dropna(subset=[DataFrameSpecs.COUNTS], inplace=True)
df = df[~np.isinf(df[DataFrameSpecs.COUNTS])]
else:
df[DataFrameSpecs.COUNTS] = raw_counts
return df
balancing_str = balancing.value if balancing else Balancing.NONE.value
records = hicstraw.straw('observed', balancing_str, str(fpath), region1, region2, 'BP', resolution)
counts_dtype = DataFrameSpecs.NORM_COUNTS_DTYPE if balancing != Balancing.NONE else DataFrameSpecs.RAW_COUNTS_DTYPE
if not records:
return pd.DataFrame(columns=DataFrameSpecs.COO_COLUMN_NAMES) if output_format == DS.DF else coo_matrix((0,0))
if chunksize is not None:
def _hicstraw_iterator():
"""
Function _hicstraw_iterator.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
for i in range(0, len(records), chunksize):
chunk_records = records[i:i + chunksize]
r = np.array([rec.binX for rec in chunk_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
c = np.array([rec.binY for rec in chunk_records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
cts = np.array([rec.counts for rec in chunk_records], dtype=counts_dtype)
yield pd.DataFrame({DataFrameSpecs.ROW_IDS: r, DataFrameSpecs.COL_IDS: c, DataFrameSpecs.COUNTS: cts})
return _hicstraw_iterator()
row_ids = np.array([rec.binX for rec in records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
col_ids = np.array([rec.binY for rec in records], dtype=DataFrameSpecs.INDICES_DTYPE) // resolution
counts = np.array([rec.counts for rec in records], dtype=counts_dtype)
if output_format == DS.DF:
return pd.DataFrame({DataFrameSpecs.ROW_IDS: row_ids, DataFrameSpecs.COL_IDS: col_ids, DataFrameSpecs.COUNTS: counts})
elif output_format == DS.COO:
return to_coo_matrix((row_ids, col_ids, counts), is_triu_sym=(region2 == region1))
return (row_ids, col_ids, counts)
[docs]
@validate_call(config={"arbitrary_types_allowed": True})
def load_hic(
#? --- Input Files ---
fpath: str | pathlib.Path,
#? --- Region Info ---
region1: str,
resolution: int,
region2: str | None = None,
#? --- Options ---
balancing: Balancing | list[Balancing] | None = Balancing.NONE,
output_format: DS = DS.DF,
return_raw_counts: bool = False,
backend: Backend = Backend.STRAW,
chunksize: int | None = None,
) -> ContactMatrix:
"""Loads contact matrix data from a .hic file lazily.
Examples
--------
"""
loader_kwargs = {
"fpath": fpath, "region1": region1, "resolution": resolution, "region2": region2,
"balancing": balancing, "output_format": output_format, "return_raw_counts": return_raw_counts,
"backend": backend, "chunksize": chunksize,
}
return ContactMatrix(
chromosome1=region1.split(":")[0],
chromosome2=region2.split(":")[0] if region2 else region1.split(":")[0],
resolution=resolution,
loader_func=_load_hic_data,
loader_kwargs=loader_kwargs,
metadata={"format": "hic"}
)
@functools.lru_cache(maxsize=16)
def _get_hic_file(fpath: str, resolution: int) -> t.Any:
"""
Function _get_hic_file.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
import hictkpy
return hictkpy.File(fpath, resolution)
@validate_call(config=dict(arbitrary_types_allowed=True))
def _read_header(reader: t.Any, order: t.Literal['big', 'little'] = 'little') -> HiCHeader:
"""
Function _read_header.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
magic_str = reader.read(3)
reader.read(1)
if magic_str != b"HIC":
raise LoaderError("File is not a valid Hi-C file (magic string is incorrect).")
version = utils.bstr2int(reader.read(4), order)
master_index = utils.bstr2int(reader.read(8), order)
genome = utils.read_str(reader, encoding='ascii')
num_attrs = utils.bstr2int(reader.read(4), order)
metadata = {utils.read_str(reader): utils.read_str(reader) for _ in range(num_attrs)}
num_chrs = utils.bstr2int(reader.read(4), order)
chr_infos = {}
for _ in range(num_chrs):
name = utils.read_str(reader)
length = utils.bstr2int(reader.read(4), order)
if name and length: chr_infos[name] = {'name': name, 'length': length}
num_bp_res = utils.bstr2int(reader.read(4), order)
resolutions = [utils.bstr2int(reader.read(4), order) for _ in range(num_bp_res)]
return HiCHeader(version=version, master_index=master_index, genome=genome, metadata=metadata, chromosomes=chr_infos, resolutions=resolutions)
@validate_call(config=dict(arbitrary_types_allowed=True))
def _read_footer(reader: t.Any, buffer: mmap.mmap, master_index: int, order: t.Literal['big', 'little'] = 'little') -> HiCFooter:
"""
Function _read_footer.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
reader.seek(master_index)
num_cpair_entries = utils.bstr2int(reader.read(4), order)
cpair_info = {}
for _ in range(num_cpair_entries):
key = utils.read_str(reader)
fpos = utils.bstr2int(reader.read(8), order)
cpair_info[key] = fpos
reader.seek(num_cpair_entries * 4, 1)
expected, factors, norm_info, norms = {}, {}, {}, []
num_raw_expected_vals = utils.bstr2int(reader.read(4), order)
for _ in range(num_raw_expected_vals):
unit = utils.read_str(reader)
bin_size = utils.bstr2int(reader.read(4), order)
num_vals = utils.bstr2int(reader.read(4), order)
expected[('RAW', unit, bin_size)] = np.frombuffer(buffer, dtype='<d', count=num_vals, offset=reader.tell())
reader.seek(num_vals * 8, 1)
num_nf = utils.bstr2int(reader.read(4), order)
factors[('RAW', unit, bin_size)] = np.frombuffer(buffer, dtype={'names':['chrom','factor'], 'formats':['<i', '<d']}, count=num_nf, offset=reader.tell())
reader.seek(num_nf * 12, 1)
pn_bytes = reader.read(4)
if pn_bytes:
num_nev = utils.bstr2int(pn_bytes, order)
for _ in range(num_nev):
nt = utils.read_str(reader)
if nt not in norms: norms.append(nt)
unit, bs, nv = utils.read_str(reader), utils.bstr2int(reader.read(4), order), utils.bstr2int(reader.read(4), order)
expected[(nt, unit, bs)] = np.frombuffer(buffer, dtype='<d', count=nv, offset=reader.tell())
reader.seek(nv * 8, 1)
num_nf = utils.bstr2int(reader.read(4), order)
factors[(nt, unit, bs)] = np.frombuffer(buffer, dtype={'names':['chrom','factor'], 'formats':['<i', '<d']}, count=num_nf, offset=reader.tell())
reader.seek(num_nf * 12, 1)
num_ne = utils.bstr2int(reader.read(4), order)
for _ in range(num_ne):
nt, ci, unit, res = utils.read_str(reader), utils.bstr2int(reader.read(4), order), utils.read_str(reader), utils.bstr2int(reader.read(4), order)
fp, sz = utils.bstr2int(reader.read(8), order), utils.bstr2int(reader.read(4), order)
norm_info[(nt, unit, res, ci)] = {'file_pos': fp, 'size': sz}
return HiCFooter(cpair_info=cpair_info, expected_values=expected, norm_factors=factors, norm_info=norm_info, available_norms=norms)