Source code for gunz_cm.compressions.cmc_decoder

"""
CMC Decoder wrapper for GNZ v3 compression.

Examples
--------
"""

__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"

import sys
import pathlib
import numpy as np

_FILE_PATH = pathlib.Path(__file__).resolve()
_WS_ROOT = _FILE_PATH.parent
for _ in range(4):
    _WS_ROOT = _WS_ROOT.parent
_CMC_PATH = _WS_ROOT / "3d_recon" / "thirdparty" / "cmc"
if not _CMC_PATH.exists():
    raise FileNotFoundError(f"CMC not found at {_CMC_PATH}")

if str(_CMC_PATH) not in sys.path:
    sys.path.insert(0, str(_CMC_PATH))

import cmc.jbig1  # noqa: E402
import cmc.transform  # noqa: E402

jbig_decode = cmc.jbig1.decode
debinarize_rc_bin_split_v2 = cmc.transform.debinarize_rc_bin_split_v2
reverse_diag_transform_mode0 = cmc.transform.reverse_diag_transform_mode0


[docs]class CmcDecoder: """CMC decoder for contact matrix tiles. Parameters ---------- tile_size : int, default=256 Tile size for block processing. resolution : int, default=50000 Hi-C resolution in bp. diag_transform : bool, default=True Reverse diagonal transform after decoding. Examples -------- """ def __init__( self, tile_size: int = 256, resolution: int = 50000, diag_transform: bool = True, ): """ Examples -------- """ self.tile_size = tile_size self.resolution = resolution self.diag_transform = diag_transform
[docs] def decode_tile(self, payload: bytes) -> np.ndarray: """Decode a single CMC-encoded tile. Parameters ---------- payload : bytes CMC-encoded bitstream. Returns ------- np.ndarray Decoded contact matrix tile. Examples -------- """ bin_mat = jbig_decode(payload) debinarized = debinarize_rc_bin_split_v2(bin_mat, axis=0) if self.diag_transform: return reverse_diag_transform_mode0(debinarized) return debinarized
[docs] def decode_tiles(self, payloads: list[bytes]) -> np.ndarray: """Decode multiple tiles into a 4D array. Parameters ---------- payloads : list[bytes] List of encoded bitstreams. Returns ------- np.ndarray 4D array of decoded tiles (n_tile_rows, n_tile_cols, tile_size, tile_size). Examples -------- """ n_tiles = len(payloads) decoded = [self.decode_tile(p) for p in payloads] tile_shape = decoded[0].shape tile_rows = int(np.sqrt(n_tiles)) tile_cols = n_tiles // tile_rows if tile_rows > 0 else 1 result = np.empty((tile_rows, tile_cols, *tile_shape), dtype=np.uint32) idx = 0 for i in range(tile_rows): for j in range(tile_cols): result[i, j] = decoded[idx] idx += 1 return result