hoops_ai.flowmanager.flow_builder
Quick Overview
Classes
ParallelExecutor(max_workers)Simplified parallel executor configuration
Functions
create_flow(name, tasks, flows_outputdir[, ...])Module-level flow creation function with simplified parameters.
Returns a dictionary of all registered tasks.
Prints information about all registered tasks.
validate_function_signature(func, ...)Validates that a function has the expected signature for the task type.
Validates all registered tasks and returns validation results.
HOOPS CAD Flow Builder with ETL-Based Task Decorators
This module provides a flow builder interface for customizing CAD processing workflows using the ETL (Extract, Transform, Load) pattern with type-safe Python decorators.
KEY FEATURES:
ETL-Based Task Decorators:
@flowtask.extract: Data extraction phase (e.g., gathering CAD files)
@flowtask.transform: Data transformation phase (e.g., encoding CAD data)
@flowtask.custom: Flexible custom tasks for any processing
Simplified Flow Creation:
hoops_ai.create_flow(): Create flows with direct parameters
Simple max_workers parameter for parallel execution
Automatic dataset export integration
Automatic Function Injection:
Decorators inject user functions into task execution framework
No need to implement new ParallelTask classes
Task names are user-defined, not hard-coded
Execution Mode Control:
parallel_execution parameter on decorators for explicit control
Automatic sequential mode when max_workers <= 1
Per-task execution mode override
USAGE EXAMPLE:
from hoops_ai.flowmanager import flowtask
import hoops_ai
@flowtask.extract(
name="gather_cad_files",
inputs=["cad_datasources"],
outputs=["cad_dataset"],
parallel_execution=True
)
def my_gather_function(source: str) -> List[str]:
return glob.glob(f"{source}/*.step")
@flowtask.transform(
name="encode_cad",
inputs=["cad_file", "cad_loader", "storage"],
outputs=["face_count", "edge_count"],
parallel_execution=True
)
def my_encode_function(cad_file, cad_loader, storage):
# transformation logic
return face_count, edge_count
# Create flow with simplified parameters
my_flow = hoops_ai.create_flow(
name="my_flow",
tasks=[my_gather_function, my_encode_function],
flows_outputdir="./output",
max_workers=4,
debug=False
)
# Execute flow
flow_output, summary, flow_file = my_flow.process(inputs={"cad_datasources": ["/path/to/cad"]})
- class hoops_ai.flowmanager.flow_builder.ParallelExecutor(max_workers)
Bases:
objectSimplified parallel executor configuration
- Parameters:
max_workers (int)
- hoops_ai.flowmanager.flow_builder.create_flow(name, tasks, flows_outputdir, max_workers=None, ml_task=None, debug=False, auto_dataset_export=True, export_visualization=True)
Module-level flow creation function with simplified parameters.
- Parameters:
name (str) – Name of the flow
flows_outputdir (str) – Output directory for flow results
max_workers (int) – Number of parallel workers (None = auto-detect CPU count)
ml_task (str) – Optional ML task description
debug (bool) – Enable debug logging
auto_dataset_export (bool) – Automatically inject dataset export task
export_visualization (bool) – Export visualization resources (stream cache, PNG) for each processed file
- Returns:
Configured Flow instance
Example:
import hoops_ai from hoops_ai.flowmanager import flowtask @flowtask.transform(name="encode", parallel_execution=True) def my_encoder(cad_file, cad_loader, storage): # transformation logic return face_count, edge_count flow = hoops_ai.create_flow( name="my_flow", tasks=[my_encoder], flows_outputdir="./output", max_workers=4 )
- hoops_ai.flowmanager.flow_builder.list_registered_tasks()
Returns a dictionary of all registered tasks.
- hoops_ai.flowmanager.flow_builder.print_task_registry()
Prints information about all registered tasks.
- hoops_ai.flowmanager.flow_builder.validate_function_signature(func, expected_params, task_type)
Validates that a function has the expected signature for the task type.