Source code for gunz_cm.converters.gnz

# -*- coding: utf-8 -*-
"""
Converter to .gnz unified container.


Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"

import pathlib
import typing as t
import numpy as np
import scipy.sparse
from pydantic import validate_call

from ..io.gnz import GnzWriter
from .. import loaders
from ..exceptions import ConverterError
from ..consts import Balancing, DataStructure, Backend, DataFrameSpecs

[docs] @validate_call(config=dict(arbitrary_types_allowed=True)) def convert_to_gnz( fpath: pathlib.Path, output_fpath: pathlib.Path, region1: str, resolution: int, balancing: t.Optional[Balancing] = None, backend: Backend = Backend.HICTK, dtype: str = "float32", overwrite: bool = False, layout: str = "dense", # dense, tiled, csr, block_sparse block_size: int = 1024, ) -> None: """ Converts a Hi-C file to a .gnz container with matrix and weights. Supports multiple layouts: dense, tiled, csr, block_sparse. Uses streaming/incremental writing to support high-resolution data. Examples -------- """ if not output_fpath.name.endswith(".gnz"): output_fpath = output_fpath.with_suffix(".gnz") writer = GnzWriter(output_fpath, overwrite=overwrite) # 1. Fetch Metadata & Raw Pixels df_raw = loaders.load_cm_data( fpath=fpath, region1=region1, resolution=resolution, balancing=Balancing.NONE, output_format=DataStructure.DF, backend=backend ) row_ids = df_raw[DataFrameSpecs.ROW_IDS].to_numpy() col_ids = df_raw[DataFrameSpecs.COL_IDS].to_numpy() counts = df_raw[DataFrameSpecs.COUNTS].to_numpy() # Cleanup counts (NaN/Inf to 0) if np.issubdtype(counts.dtype, np.floating): counts[~np.isfinite(counts)] = 0 n = int(max(np.max(row_ids), np.max(col_ids)) + 1) if len(row_ids) > 0 else 0 meta = { "resolution": resolution, "region": region1, "chromosome1": region1.split(":")[0], "source_file": str(fpath), "balancing": balancing.value if balancing else "NONE", "layout": layout, "block_size": block_size if layout in ["tiled", "block_sparse"] else None } # 2. Prepare Weights (Small enough to keep in RAM) w_local = None if balancing and balancing != Balancing.NONE: if backend == Backend.HICTK: import hictkpy f_hic = hictkpy.File(str(fpath), resolution) bins = f_hic.bins() offset = bins.get_id(region1.split(":")[0], 0) w_global = f_hic.weights(balancing.value, divisive=True) w_local = w_global[offset : offset + n] writer.add_array(f"weights_{balancing.value}", w_local, dtype="float32") # 3. Layout-Specific Registration & Writing if layout == "dense": writer.init_streaming_array("matrix", (n, n), dtype) writer.set_metadata(meta) writer.write() mm = writer.get_array_writable("matrix") if len(row_ids) > 0: mm[row_ids, col_ids] = counts.astype(dtype) nondiag = row_ids != col_ids mm[col_ids[nondiag], row_ids[nondiag]] = counts[nondiag].astype(dtype) mm.flush() elif layout == "tiled": pad_h = (block_size - (n % block_size)) % block_size padded_n = n + pad_h n_blocks = padded_n // block_size padded_shape = (n_blocks, n_blocks, block_size, block_size) writer.init_streaming_array("matrix", padded_shape, dtype) meta["padded_shape"] = (padded_n, padded_n) writer.set_metadata(meta) writer.write() mm = writer.get_array_writable("matrix") if len(row_ids) > 0: br, pr = np.divmod(row_ids, block_size) bc, pc = np.divmod(col_ids, block_size) mm[br, bc, pr, pc] = counts.astype(dtype) nondiag = row_ids != col_ids if np.any(nondiag): mm[bc[nondiag], br[nondiag], pc[nondiag], pr[nondiag]] = counts[nondiag].astype(dtype) mm.flush() elif layout == "csr": if len(row_ids) > 0: # Symmetrize all_rows = np.concatenate([row_ids, col_ids[row_ids != col_ids]]) all_cols = np.concatenate([col_ids, row_ids[row_ids != col_ids]]) all_data = np.concatenate([counts, counts[row_ids != col_ids]]) csr = scipy.sparse.csr_matrix((all_data, (all_rows, all_cols)), shape=(n, n), dtype=dtype) else: csr = scipy.sparse.csr_matrix((n, n), dtype=dtype) writer.add_array("indptr", csr.indptr, dtype="int64") writer.add_array("indices", csr.indices, dtype="int32") writer.add_array("data", csr.data, dtype=dtype) writer.set_metadata(meta) writer.write() elif layout == "block_sparse": pad_h = (block_size - (n % block_size)) % block_size padded_n = n + pad_h n_blocks = padded_n // block_size if len(row_ids) > 0: br, bc = row_ids // block_size, col_ids // block_size active_coords = set(zip(br, bc)) active_coords.update(zip(bc, br)) sorted_active = sorted(list(active_coords)) block_index = np.full((n_blocks, n_blocks), -1, dtype=np.int32) for i, (r, c) in enumerate(sorted_active): block_index[r, c] = i writer.add_array("block_index", block_index, dtype="int32") writer.init_streaming_array("block_data", (len(sorted_active), block_size, block_size), dtype) meta["padded_shape"] = (padded_n, padded_n) writer.set_metadata(meta) writer.write() mm = writer.get_array_writable("block_data") b_ids = block_index[br, bc] lr, lc = row_ids % block_size, col_ids % block_size mm[b_ids, lr, lc] = counts.astype(dtype) nondiag = row_ids != col_ids if np.any(nondiag): b_ids_sym = block_index[bc[nondiag], br[nondiag]] mm[b_ids_sym, lc[nondiag], lr[nondiag]] = counts[nondiag].astype(dtype) mm.flush() else: block_index = np.full((n_blocks, n_blocks), -1, dtype=np.int32) writer.add_array("block_index", block_index, dtype="int32") writer.set_metadata(meta) writer.write() else: raise ConverterError(f"Unknown layout: {layout}")