# -*- coding: utf-8 -*-
"""
Core module for data-oriented pipeline construction and execution.
Examples
--------
"""
# =============================================================================
# METADATA
# =============================================================================
__author__ = "Yeremia Gunawan Adhisantoso"
__email__ = "adhisant@tnt.uni-hannover.de"
__copyright__ = "Institut für Informationsverarbeitung"
__license__ = "Clear BSD"
__version__ = "2.5.0"
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import importlib
import typing as t
from functools import partial
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
from pydantic import validate_call
# =============================================================================
# TYPE ALIASES
# =============================================================================
CallableStep = t.Callable[[t.Any], t.Any]
PipelineStep = tuple[str, CallableStep]
[docs]
class Pipeline:
"""
A simple data-oriented pipeline that chains callable steps.
Examples
--------
"""
def __init__(
self,
#? --- Configuration ---
steps: list[PipelineStep],
):
"""
Initializes the Pipeline.
Parameters
----------
steps : list[tuple[str, Callable]]
A list of tuples, each containing a step name and a callable.
Examples
--------
"""
self.steps = steps
[docs]
def run(
self,
#? --- Input Data ---
initial_data: t.Any = None,
) -> t.Any:
"""
Executes the pipeline sequentially.
Parameters
----------
initial_data : Any, optional
The initial data to pass into the first pipeline step.
Defaults to None.
Returns
-------
Any
The output data from the final pipeline step.
Examples
--------
"""
data = initial_data
for name, step_func in self.steps:
data = step_func(data)
return data
def __call__(
self,
#? --- Input Data ---
initial_data: t.Any = None,
) -> t.Any:
"""
Allows the pipeline instance to be called directly.
Examples
--------
"""
return self.run(initial_data=initial_data)
def _resolve_callable(
#? --- Target ---
target: str,
) -> t.Callable:
"""
Resolves a string module path to the actual function.
Parameters
----------
target : str
The full module path to the function (e.g., 'gunz_cm.loaders.load_cm_data').
Returns
-------
Callable
The resolved function.
Raises
------
ValueError
If the target string does not contain a module path.
Examples
--------
"""
if '.' not in target:
raise ValueError(
f"Target '{target}' must be a full module path, "
"e.g., 'module.submodule.function_name'"
)
module_name, func_name = target.rsplit('.', 1)
module = importlib.import_module(module_name)
func = getattr(module, func_name)
return func
[docs]
@validate_call
def create_pipeline(
#? --- Configuration ---
config: list[dict[str, t.Any]],
) -> Pipeline:
"""
Factory function to create a Pipeline from a configuration structure.
Parameters
----------
config : list[dict[str, Any]]
A list of configuration dictionaries, each specifying a pipeline step.
Expected keys are 'name' (str), 'target' (str), and optionally
'kwargs' (dict).
Returns
-------
Pipeline
The constructed pipeline.
Raises
------
ValueError
If a step is missing a 'target' key.
Examples
--------
"""
steps = []
for step_cfg in config:
name = step_cfg.get("name", "unnamed_step")
target = step_cfg.get("target")
if not target:
raise ValueError(f"Step '{name}' is missing a 'target' callable path.")
func = _resolve_callable(target=target)
kwargs = step_cfg.get("kwargs", {})
# If kwargs are provided, bind them partially
if kwargs:
step_func = partial(func, **kwargs)
else:
step_func = func
# Wrapper to handle functions that take no input arguments
def step_wrapper(data: t.Any, f: t.Callable = step_func) -> t.Any:
"""
Function step_wrapper.
Parameters
----------
Returns
-------
Examples
--------
Notes
-----
"""
if data is None:
return f()
return f(data)
steps.append((name, step_wrapper))
return Pipeline(steps=steps)