"""
CMC Transforms + Zstd Encoder for GZCM v3 compression.
Combines CMC's domain-specific transforms (diagonal transform, binarization)
with Zstd entropy coding for faster decode than pure CMC.
Examples
--------
"""
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"
import sys
import pathlib
import zlib
import numpy as np
try:
import zstandard as zstd
HAS_ZSTD = True
except ImportError:
HAS_ZSTD = False
_FILE_PATH = pathlib.Path(__file__).resolve()
_WS_ROOT = _FILE_PATH.parent
for _ in range(4):
_WS_ROOT = _WS_ROOT.parent
_CMC_PATH = _WS_ROOT / "3d_recon" / "thirdparty" / "cmc"
if not _CMC_PATH.exists():
raise FileNotFoundError(f"CMC not found at {_CMC_PATH}")
if str(_CMC_PATH) not in sys.path:
sys.path.insert(0, str(_CMC_PATH))
import cmc.transform # noqa: E402
binarize_rc_bin_split_v2 = cmc.transform.binarize_rc_bin_split_v2
diag_transform = cmc.transform.diag_transform
[docs]class CmcZstdEncoder:
"""CMC Transforms + Zstd encoder for contact matrix tiles.
Uses CMC's domain-specific transforms (diagonal transform, binarization)
with Zstd entropy coding for better compression and faster decode.
Parameters
----------
tile_size : int, default=256
Tile size for block processing.
resolution : int, default=50000
Hi-C resolution in bp.
level : int, default=3
Compression level (1-22 for zstd, 1-9 for zlib fallback).
Examples
--------
"""
def __init__(
self,
tile_size: int = 256,
resolution: int = 50000,
level: int = 3,
):
"""
Examples
--------
"""
self.tile_size = tile_size
self.resolution = resolution
self.level = level
[docs] def encode_tile(self, mat: np.ndarray) -> bytes:
"""Encode a single contact matrix tile.
Parameters
----------
mat : np.ndarray
2D contact matrix tile (upper triangular).
Returns
-------
bytes
Compressed bitstream (shape info + encoded data).
Examples
--------
"""
mat = diag_transform(mat, mode=0)
bin_mat = binarize_rc_bin_split_v2(mat, axis=0)
data = bin_mat.tobytes()
shape = np.array(bin_mat.shape, dtype=np.int32).tobytes()
if HAS_ZSTD:
ctx = zstd.ZstdCompressor(level=self.level)
compressed = ctx.compress(data)
else:
compressed = zlib.compress(data, level=min(self.level, 9))
return shape + compressed
[docs] def encode_tiles(self, tiles: np.ndarray) -> list[bytes]:
"""Encode multiple tiles.
Parameters
----------
tiles : np.ndarray
4D array of shape (n_tile_rows, n_tile_cols, tile_size, tile_size).
Returns
-------
list[bytes]
List of encoded bitstreams, one per tile.
Examples
--------
"""
n_tile_rows, n_tile_cols = tiles.shape[0], tiles.shape[1]
results = []
for i in range(n_tile_rows):
for j in range(n_tile_cols):
results.append(self.encode_tile(tiles[i, j]))
return results
[docs] def get_compression_info(self) -> dict:
"""Return compression metadata.
Returns
-------
dict
Compression parameters for header.
Examples
--------
"""
return {
"codec": "cmc_zstd",
"version": "1.0",
"tile_size": self.tile_size,
"resolution": self.resolution,
"level": self.level,
}