hoops_ai.flowmanager
Quick Overview
Classes
Flow(name, specifications[, tasks])Class to manage and orchestrate the execution of multiple Tasks.
FlowOutput(flow_spec, task_instances, ...)Class to encapsulate the results, errors, and logs from the Flow execution.
ParallelExecutor(max_workers)Simplified parallel executor configuration
ETL-based task decorators for flow orchestration.
Functions
create_flow(name, tasks, flows_outputdir[, ...])Module-level flow creation function with simplified parameters.
Returns a dictionary of all registered tasks.
Prints information about all registered tasks.
Validates all registered tasks and returns validation results.
Data Flow Management Module
The FlowManager module provides a powerful framework for orchestrating complex CAD data processing workflows. It enables the creation of scalable, parallelizable pipelines that can handle everything from single CAD file processing to massive batch operations across entire datasets.
This module implements a task-based architecture where complex workflows are decomposed into manageable, reusable components. It supports both sequential and parallel execution patterns, with built-in monitoring, error handling, and progress tracking capabilities.
KEY FEATURES:
ETL-Based Task Decorators:
@flowtask.extract: Data extraction phase (e.g., gathering CAD files)@flowtask.transform: Data transformation phase (e.g., encoding CAD data)@flowtask.compute_embeddings: Compute shape embeddings from CAD files@flowtask.custom: Flexible custom tasks for any processing
Simplified Flow Creation:
create_flow(): Create flows with direct parametersSimple
max_workersparameter for parallel executionAutomatic dataset export integration
Automatic Function Injection:
Decorators inject user functions into task execution framework
No need to implement new ParallelTask classes
Task names are user-defined, not hard-coded
Execution Mode Control:
parallel_executionparameter on decorators for explicit controlAutomatic sequential mode when
max_workers <= 1Per-task execution mode override
USAGE EXAMPLE:
from hoops_ai.flowmanager import flowtask
import hoops_ai
@flowtask.extract(
name="gather_cad_files",
inputs=["cad_datasources"],
outputs=["cad_dataset"],
parallel_execution=True
)
def my_gather_function(source: str) -> List[str]:
return glob.glob(f"{source}/*.step")
@flowtask.transform(
name="encode_cad",
inputs=["cad_file", "cad_loader", "storage"],
outputs=["face_count", "edge_count"],
parallel_execution=True
)
def my_encode_function(cad_file, cad_loader, storage):
# transformation logic
return face_count, edge_count
# Create flow with simplified parameters
my_flow = hoops_ai.create_flow(
name="my_flow",
tasks=[my_gather_function, my_encode_function],
flows_outputdir="./output",
max_workers=4,
debug=False
)
# Execute flow
flow_output = my_flow.process(inputs={"cad_datasources": ["/path/to/cad"]})
print(flow_output.summary())
For workflow design patterns and best practices, see the Data Flow Management Programming Guide.