hoops_ai.flowmanager.flow_builder
HOOPS CAD Flow Builder with ETL-Based Task Decorators
This module provides a flow builder interface for customizing CAD processing workflows using the ETL (Extract, Transform, Load) pattern with type-safe Python decorators.
KEY FEATURES:
ETL-Based Task Decorators: - @flowtask.extract: Data extraction phase (e.g., gathering CAD files) - @flowtask.transform: Data transformation phase (e.g., encoding CAD data) - @flowtask.custom: Flexible custom tasks for any processing
Simplified Flow Creation: - hoops_ai.create_flow(): Create flows with direct parameters - Simple max_workers parameter for parallel execution - Automatic dataset export integration
Automatic Function Injection: - Decorators inject user functions into task execution framework - No need to implement new ParallelTask classes - Task names are user-defined, not hard-coded
Execution Mode Control: - parallel_execution parameter on decorators for explicit control - Automatic sequential mode when max_workers <= 1 - Per-task execution mode override
USAGE EXAMPLE:
from hoops_ai.flowmanager import flowtask import hoops_ai
- @flowtask.extract(
name=”gather_cad_files”, inputs=[“cad_datasources”], outputs=[“cad_dataset”], parallel_execution=True
) def my_gather_function(source: str) -> List[str]:
return glob.glob(f”{source}/*.step”)
- @flowtask.transform(
name=”encode_cad”, inputs=[“cad_file”, “cad_loader”, “storage”], outputs=[“face_count”, “edge_count”], parallel_execution=True
) def my_encode_function(cad_file, cad_loader, storage):
# transformation logic return face_count, edge_count
# Create flow with simplified parameters my_flow = hoops_ai.create_flow(
name=”my_flow”, tasks=[my_gather_function, my_encode_function], flows_outputdir=”./output”, max_workers=4, debug=False
)
# Execute flow flow_output, summary, flow_file = my_flow.process(inputs={“cad_datasources”: [“/path/to/cad”]})
Functions
|
Module-level flow creation function with simplified parameters. |
Returns a dictionary of all registered tasks. |
|
Prints information about all registered tasks. |
|
|
Validates that a function has the expected signature for the task type. |
Validates all registered tasks and returns validation results. |
Classes
|
Simplified parallel executor configuration |
|
Storage type enumeration for data storage selection |
ETL-based task decorators for flow orchestration. |
- class hoops_ai.flowmanager.flow_builder.ParallelExecutor(max_workers)
Bases:
objectSimplified parallel executor configuration
- Parameters:
max_workers (int)
- class hoops_ai.flowmanager.flow_builder.StorageType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)
Bases:
EnumStorage type enumeration for data storage selection
- JSON = 'json'
- OPTIMIZED = 'optimized'
- class hoops_ai.flowmanager.flow_builder.TaskDecorators
Bases:
objectETL-based task decorators for flow orchestration.
Supports ETL (Extract, Transform, Load) pattern: - @flowtask.extract: Data extraction phase (e.g., file gathering) - @flowtask.transform: Data transformation phase (e.g., encoding) - @flowtask.custom: Flexible custom tasks for any processing
- custom(name=None, inputs=None, outputs=None, parallel_execution=True)
Decorator for custom generic tasks with flexible signatures.
- extract(name=None, inputs=None, outputs=None, parallel_execution=True)
ETL Extract phase decorator - for data extraction tasks.
- transform(name=None, inputs=None, outputs=None, parallel_execution=True)
ETL Transform phase decorator - for data transformation tasks.
- hoops_ai.flowmanager.flow_builder.create_flow(name, tasks, flows_outputdir, max_workers=None, ml_task=None, debug=False, auto_dataset_export=True)
Module-level flow creation function with simplified parameters.
- Parameters:
name (str) – Name of the flow
flows_outputdir (str) – Output directory for flow results
max_workers (int) – Number of parallel workers (None = auto-detect CPU count)
ml_task (str) – Optional ML task description
debug (bool) – Enable debug logging
auto_dataset_export (bool) – Automatically inject dataset export task
- Returns:
Configured Flow instance
Example
>>> import hoops_ai >>> from hoops_ai.flowmanager import flowtask >>> >>> @flowtask.transform(name="encode", parallel_execution=True) >>> def my_encoder(cad_file, cad_loader, storage): >>> # transformation logic >>> return face_count, edge_count >>> >>> flow = hoops_ai.create_flow( >>> name="my_flow", >>> tasks=[my_encoder], >>> flows_outputdir="./output", >>> max_workers=4 >>> )
- hoops_ai.flowmanager.flow_builder.list_registered_tasks()
Returns a dictionary of all registered tasks.
- hoops_ai.flowmanager.flow_builder.print_task_registry()
Prints information about all registered tasks.
- hoops_ai.flowmanager.flow_builder.validate_function_signature(func, expected_params, task_type)
Validates that a function has the expected signature for the task type.