hoops_ai.flowmanager.flow_builder

Quick Overview

Classes

ParallelExecutor(max_workers)

Simplified parallel executor configuration

Functions

create_flow(name, tasks, flows_outputdir[, ...])

Module-level flow creation function with simplified parameters.

list_registered_tasks()

Returns a dictionary of all registered tasks.

print_task_registry()

Prints information about all registered tasks.

validate_function_signature(func, ...)

Validates that a function has the expected signature for the task type.

validate_tasks()

Validates all registered tasks and returns validation results.

HOOPS CAD Flow Builder with ETL-Based Task Decorators

This module provides a flow builder interface for customizing CAD processing workflows using the ETL (Extract, Transform, Load) pattern with type-safe Python decorators.

KEY FEATURES:

  1. ETL-Based Task Decorators:

    • @flowtask.extract: Data extraction phase (e.g., gathering CAD files)

    • @flowtask.transform: Data transformation phase (e.g., encoding CAD data)

    • @flowtask.custom: Flexible custom tasks for any processing

  2. Simplified Flow Creation:

    • hoops_ai.create_flow(): Create flows with direct parameters

    • Simple max_workers parameter for parallel execution

    • Automatic dataset export integration

  3. Automatic Function Injection:

    • Decorators inject user functions into task execution framework

    • No need to implement new ParallelTask classes

    • Task names are user-defined, not hard-coded

  4. Execution Mode Control:

    • parallel_execution parameter on decorators for explicit control

    • Automatic sequential mode when max_workers <= 1

    • Per-task execution mode override

USAGE EXAMPLE:

from hoops_ai.flowmanager import flowtask
import hoops_ai

@flowtask.extract(
    name="gather_cad_files",
    inputs=["cad_datasources"],
    outputs=["cad_dataset"],
    parallel_execution=True
)
def my_gather_function(source: str) -> List[str]:
    return glob.glob(f"{source}/*.step")

@flowtask.transform(
    name="encode_cad",
    inputs=["cad_file", "cad_loader", "storage"],
    outputs=["face_count", "edge_count"],
    parallel_execution=True
)
def my_encode_function(cad_file, cad_loader, storage):
    # transformation logic
    return face_count, edge_count

# Create flow with simplified parameters
my_flow = hoops_ai.create_flow(
    name="my_flow",
    tasks=[my_gather_function, my_encode_function],
    flows_outputdir="./output",
    max_workers=4,
    debug=False
)

# Execute flow
flow_output, summary, flow_file = my_flow.process(inputs={"cad_datasources": ["/path/to/cad"]})
class hoops_ai.flowmanager.flow_builder.ParallelExecutor(max_workers)

Bases: object

Simplified parallel executor configuration

Parameters:

max_workers (int)

hoops_ai.flowmanager.flow_builder.create_flow(name, tasks, flows_outputdir, max_workers=None, ml_task=None, debug=False, auto_dataset_export=True, export_visualization=True)

Module-level flow creation function with simplified parameters.

Parameters:
  • name (str) – Name of the flow

  • tasks (List[Any]) – List of decorated functions to execute

  • flows_outputdir (str) – Output directory for flow results

  • max_workers (int) – Number of parallel workers (None = auto-detect CPU count)

  • ml_task (str) – Optional ML task description

  • debug (bool) – Enable debug logging

  • auto_dataset_export (bool) – Automatically inject dataset export task

  • export_visualization (bool) – Export visualization resources (stream cache, PNG) for each processed file

Returns:

Configured Flow instance

Example:

import hoops_ai
from hoops_ai.flowmanager import flowtask

@flowtask.transform(name="encode", parallel_execution=True)
def my_encoder(cad_file, cad_loader, storage):
    # transformation logic
    return face_count, edge_count

flow = hoops_ai.create_flow(
    name="my_flow",
    tasks=[my_encoder],
    flows_outputdir="./output",
    max_workers=4
)
hoops_ai.flowmanager.flow_builder.list_registered_tasks()

Returns a dictionary of all registered tasks.

Returns:

Dictionary with task types as keys and lists of task names as values

Return type:

Dict[str, Any]

hoops_ai.flowmanager.flow_builder.print_task_registry()

Prints information about all registered tasks.

hoops_ai.flowmanager.flow_builder.validate_function_signature(func, expected_params, task_type)

Validates that a function has the expected signature for the task type.

Parameters:
  • func (Callable) – The function to validate

  • expected_params (List[str]) – List of expected parameter names

  • task_type (str) – Type of task for error messages

Returns:

True if signature is valid

Raises:

ValueError – If signature is invalid

Return type:

bool

hoops_ai.flowmanager.flow_builder.validate_tasks()

Validates all registered tasks and returns validation results.

Returns:

Dictionary with validation results including valid_tasks, invalid_tasks, and warnings

Return type:

Dict[str, Any]