Source code for gunz_cm.pipeline.core

# -*- coding: utf-8 -*-
"""
Core module for data-oriented pipeline construction and execution.


Examples
--------
"""

# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__copyright__ = "Institut für Informationsverarbeitung"
__license__ = "Clear BSD"
__version__ = "2.5.0"

# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import importlib
import typing as t
from functools import partial

# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from pydantic import validate_call

# =============================================================================
# TYPE ALIASES
# =============================================================================
CallableStep = t.Callable[[t.Any], t.Any]
PipelineStep = tuple[str, CallableStep]


[docs] class Pipeline: """ A simple data-oriented pipeline that chains callable steps. Examples -------- """ def __init__( self, #? --- Configuration --- steps: list[PipelineStep], ): """ Initializes the Pipeline. Parameters ---------- steps : list[tuple[str, Callable]] A list of tuples, each containing a step name and a callable. Examples -------- """ self.steps = steps
[docs] def run( self, #? --- Input Data --- initial_data: t.Any = None, ) -> t.Any: """ Executes the pipeline sequentially. Parameters ---------- initial_data : Any, optional The initial data to pass into the first pipeline step. Defaults to None. Returns ------- Any The output data from the final pipeline step. Examples -------- """ data = initial_data for name, step_func in self.steps: data = step_func(data) return data
def __call__( self, #? --- Input Data --- initial_data: t.Any = None, ) -> t.Any: """ Allows the pipeline instance to be called directly. Examples -------- """ return self.run(initial_data=initial_data)
def _resolve_callable( #? --- Target --- target: str, ) -> t.Callable: """ Resolves a string module path to the actual function. Parameters ---------- target : str The full module path to the function (e.g., 'gunz_cm.loaders.load_cm_data'). Returns ------- Callable The resolved function. Raises ------ ValueError If the target string does not contain a module path. Examples -------- """ if '.' not in target: raise ValueError( f"Target '{target}' must be a full module path, " "e.g., 'module.submodule.function_name'" ) module_name, func_name = target.rsplit('.', 1) module = importlib.import_module(module_name) func = getattr(module, func_name) return func
[docs] @validate_call def create_pipeline( #? --- Configuration --- config: list[dict[str, t.Any]], ) -> Pipeline: """ Factory function to create a Pipeline from a configuration structure. Parameters ---------- config : list[dict[str, Any]] A list of configuration dictionaries, each specifying a pipeline step. Expected keys are 'name' (str), 'target' (str), and optionally 'kwargs' (dict). Returns ------- Pipeline The constructed pipeline. Raises ------ ValueError If a step is missing a 'target' key. Examples -------- """ steps = [] for step_cfg in config: name = step_cfg.get("name", "unnamed_step") target = step_cfg.get("target") if not target: raise ValueError(f"Step '{name}' is missing a 'target' callable path.") func = _resolve_callable(target=target) kwargs = step_cfg.get("kwargs", {}) # If kwargs are provided, bind them partially if kwargs: step_func = partial(func, **kwargs) else: step_func = func # Wrapper to handle functions that take no input arguments def step_wrapper(data: t.Any, f: t.Callable = step_func) -> t.Any: """ Function step_wrapper. Parameters ---------- Returns ------- Examples -------- Notes ----- """ if data is None: return f() return f(data) steps.append((name, step_wrapper)) return Pipeline(steps=steps)