# -*- coding: utf-8 -*-
"""
Converter to .gnz unified container.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
__version__ = "1.0.0"
import pathlib
import typing as t
import numpy as np
import scipy.sparse
from pydantic import validate_call
from ..io.gnz import GnzWriter
from .. import loaders
from ..exceptions import ConverterError
from ..consts import Balancing, DataStructure, Backend, DataFrameSpecs
[docs]
@validate_call(config=dict(arbitrary_types_allowed=True))
def convert_to_gnz(
fpath: pathlib.Path,
output_fpath: pathlib.Path,
region1: str,
resolution: int,
balancing: t.Optional[Balancing] = None,
backend: Backend = Backend.HICTK,
dtype: str = "float32",
overwrite: bool = False,
layout: str = "dense", # dense, tiled, csr, block_sparse
block_size: int = 1024,
) -> None:
"""
Converts a Hi-C file to a .gnz container with matrix and weights.
Supports multiple layouts: dense, tiled, csr, block_sparse.
Uses streaming/incremental writing to support high-resolution data.
Examples
--------
"""
if not output_fpath.name.endswith(".gnz"):
output_fpath = output_fpath.with_suffix(".gnz")
writer = GnzWriter(output_fpath, overwrite=overwrite)
# 1. Fetch Metadata & Raw Pixels
df_raw = loaders.load_cm_data(
fpath=fpath,
region1=region1,
resolution=resolution,
balancing=Balancing.NONE,
output_format=DataStructure.DF,
backend=backend
)
row_ids = df_raw[DataFrameSpecs.ROW_IDS].to_numpy()
col_ids = df_raw[DataFrameSpecs.COL_IDS].to_numpy()
counts = df_raw[DataFrameSpecs.COUNTS].to_numpy()
# Cleanup counts (NaN/Inf to 0)
if np.issubdtype(counts.dtype, np.floating):
counts[~np.isfinite(counts)] = 0
n = int(max(np.max(row_ids), np.max(col_ids)) + 1) if len(row_ids) > 0 else 0
meta = {
"resolution": resolution,
"region": region1,
"chromosome1": region1.split(":")[0],
"source_file": str(fpath),
"balancing": balancing.value if balancing else "NONE",
"layout": layout,
"block_size": block_size if layout in ["tiled", "block_sparse"] else None
}
# 2. Prepare Weights (Small enough to keep in RAM)
w_local = None
if balancing and balancing != Balancing.NONE:
if backend == Backend.HICTK:
import hictkpy
f_hic = hictkpy.File(str(fpath), resolution)
bins = f_hic.bins()
offset = bins.get_id(region1.split(":")[0], 0)
w_global = f_hic.weights(balancing.value, divisive=True)
w_local = w_global[offset : offset + n]
writer.add_array(f"weights_{balancing.value}", w_local, dtype="float32")
# 3. Layout-Specific Registration & Writing
if layout == "dense":
writer.init_streaming_array("matrix", (n, n), dtype)
writer.set_metadata(meta)
writer.write()
mm = writer.get_array_writable("matrix")
if len(row_ids) > 0:
mm[row_ids, col_ids] = counts.astype(dtype)
nondiag = row_ids != col_ids
mm[col_ids[nondiag], row_ids[nondiag]] = counts[nondiag].astype(dtype)
mm.flush()
elif layout == "tiled":
pad_h = (block_size - (n % block_size)) % block_size
padded_n = n + pad_h
n_blocks = padded_n // block_size
padded_shape = (n_blocks, n_blocks, block_size, block_size)
writer.init_streaming_array("matrix", padded_shape, dtype)
meta["padded_shape"] = (padded_n, padded_n)
writer.set_metadata(meta)
writer.write()
mm = writer.get_array_writable("matrix")
if len(row_ids) > 0:
br, pr = np.divmod(row_ids, block_size)
bc, pc = np.divmod(col_ids, block_size)
mm[br, bc, pr, pc] = counts.astype(dtype)
nondiag = row_ids != col_ids
if np.any(nondiag):
mm[bc[nondiag], br[nondiag], pc[nondiag], pr[nondiag]] = counts[nondiag].astype(dtype)
mm.flush()
elif layout == "csr":
if len(row_ids) > 0:
# Symmetrize
all_rows = np.concatenate([row_ids, col_ids[row_ids != col_ids]])
all_cols = np.concatenate([col_ids, row_ids[row_ids != col_ids]])
all_data = np.concatenate([counts, counts[row_ids != col_ids]])
csr = scipy.sparse.csr_matrix((all_data, (all_rows, all_cols)), shape=(n, n), dtype=dtype)
else:
csr = scipy.sparse.csr_matrix((n, n), dtype=dtype)
writer.add_array("indptr", csr.indptr, dtype="int64")
writer.add_array("indices", csr.indices, dtype="int32")
writer.add_array("data", csr.data, dtype=dtype)
writer.set_metadata(meta)
writer.write()
elif layout == "block_sparse":
pad_h = (block_size - (n % block_size)) % block_size
padded_n = n + pad_h
n_blocks = padded_n // block_size
if len(row_ids) > 0:
br, bc = row_ids // block_size, col_ids // block_size
active_coords = set(zip(br, bc))
active_coords.update(zip(bc, br))
sorted_active = sorted(list(active_coords))
block_index = np.full((n_blocks, n_blocks), -1, dtype=np.int32)
for i, (r, c) in enumerate(sorted_active):
block_index[r, c] = i
writer.add_array("block_index", block_index, dtype="int32")
writer.init_streaming_array("block_data", (len(sorted_active), block_size, block_size), dtype)
meta["padded_shape"] = (padded_n, padded_n)
writer.set_metadata(meta)
writer.write()
mm = writer.get_array_writable("block_data")
b_ids = block_index[br, bc]
lr, lc = row_ids % block_size, col_ids % block_size
mm[b_ids, lr, lc] = counts.astype(dtype)
nondiag = row_ids != col_ids
if np.any(nondiag):
b_ids_sym = block_index[bc[nondiag], br[nondiag]]
mm[b_ids_sym, lc[nondiag], lr[nondiag]] = counts[nondiag].astype(dtype)
mm.flush()
else:
block_index = np.full((n_blocks, n_blocks), -1, dtype=np.int32)
writer.add_array("block_index", block_index, dtype="int32")
writer.set_metadata(meta)
writer.write()
else:
raise ConverterError(f"Unknown layout: {layout}")