Source code for gunz_cm.compressions.cmc_zstd_encoder

"""
CMC Transforms + Zstd Encoder for GZCM v3 compression.

Combines CMC's domain-specific transforms (diagonal transform, binarization)
with Zstd entropy coding for faster decode than pure CMC.

Examples
--------
"""

__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__license__ = "Clear BSD"

import sys
import pathlib
import zlib
import numpy as np

try:
    import zstandard as zstd

    HAS_ZSTD = True
except ImportError:
    HAS_ZSTD = False

_FILE_PATH = pathlib.Path(__file__).resolve()
_WS_ROOT = _FILE_PATH.parent
for _ in range(4):
    _WS_ROOT = _WS_ROOT.parent
_CMC_PATH = _WS_ROOT / "3d_recon" / "thirdparty" / "cmc"
if not _CMC_PATH.exists():
    raise FileNotFoundError(f"CMC not found at {_CMC_PATH}")

if str(_CMC_PATH) not in sys.path:
    sys.path.insert(0, str(_CMC_PATH))

import cmc.transform  # noqa: E402

binarize_rc_bin_split_v2 = cmc.transform.binarize_rc_bin_split_v2
diag_transform = cmc.transform.diag_transform


[docs]class CmcZstdEncoder: """CMC Transforms + Zstd encoder for contact matrix tiles. Uses CMC's domain-specific transforms (diagonal transform, binarization) with Zstd entropy coding for better compression and faster decode. Parameters ---------- tile_size : int, default=256 Tile size for block processing. resolution : int, default=50000 Hi-C resolution in bp. level : int, default=3 Compression level (1-22 for zstd, 1-9 for zlib fallback). Examples -------- """ def __init__( self, tile_size: int = 256, resolution: int = 50000, level: int = 3, ): """ Examples -------- """ self.tile_size = tile_size self.resolution = resolution self.level = level
[docs] def encode_tile(self, mat: np.ndarray) -> bytes: """Encode a single contact matrix tile. Parameters ---------- mat : np.ndarray 2D contact matrix tile (upper triangular). Returns ------- bytes Compressed bitstream (shape info + encoded data). Examples -------- """ mat = diag_transform(mat, mode=0) bin_mat = binarize_rc_bin_split_v2(mat, axis=0) data = bin_mat.tobytes() shape = np.array(bin_mat.shape, dtype=np.int32).tobytes() if HAS_ZSTD: ctx = zstd.ZstdCompressor(level=self.level) compressed = ctx.compress(data) else: compressed = zlib.compress(data, level=min(self.level, 9)) return shape + compressed
[docs] def encode_tiles(self, tiles: np.ndarray) -> list[bytes]: """Encode multiple tiles. Parameters ---------- tiles : np.ndarray 4D array of shape (n_tile_rows, n_tile_cols, tile_size, tile_size). Returns ------- list[bytes] List of encoded bitstreams, one per tile. Examples -------- """ n_tile_rows, n_tile_cols = tiles.shape[0], tiles.shape[1] results = [] for i in range(n_tile_rows): for j in range(n_tile_cols): results.append(self.encode_tile(tiles[i, j])) return results
[docs] def get_compression_info(self) -> dict: """Return compression metadata. Returns ------- dict Compression parameters for header. Examples -------- """ return { "codec": "cmc_zstd", "version": "1.0", "tile_size": self.tile_size, "resolution": self.resolution, "level": self.level, }