Flow module - Quick Reference
Architecture Overview
HOOPS AI Flow uses a decorator-based task orchestration architecture where data flows through a pipeline of parallelizable tasks. The framework automatically manages:
Task Dependencies: Data flows from task outputs to task inputs
Parallel Execution: Process multiple files concurrently using ProcessPoolExecutor
Storage Management: Automatic creation and cleanup of data stores
Schema Routing: Organize encoded data into structured groups for ML consumption
Core Concepts
Tasks: Python functions decorated with
@flowtaskthat process dataFlow: Orchestrator that chains tasks and manages execution
Schema: Blueprint defining how encoded data should be organized
Storage: Zarr-based persistence layer for encoded features
Explorer: Query interface for merged datasets
Quick Start (3 Steps)
1. Define Schema and Tasks (cad_tasks.py)
import os
import glob
import hoops_ai
from hoops_ai.flowmanager import flowtask
from hoops_ai.storage.datasetstorage.schema_builder import SchemaBuilder
# Set license at module level for worker processes
hoops_ai.set_license(os.getenv("HOOPS_AI_LICENSE"), validate=False)
# Define schema at module level
builder = SchemaBuilder(domain="CAD_analysis", version="1.0")
group = builder.create_group("faces", "face", "Face data")
group.create_array("areas", ["face"], "float32", "Face areas")
cad_schema = builder.build()
@flowtask.extract(name="gather", inputs=["sources"], outputs=["files"])
def gather_files(sources):
all_files = []
for source in sources:
all_files.extend(glob.glob(f"{source}/**/*.step", recursive=True))
return all_files
@flowtask.transform(name="encode", inputs=["cad_file", "cad_loader", "storage"],
outputs=["encoded"])
def encode_data(cad_file, cad_loader, storage):
cad_model = cad_loader.create_from_file(cad_file)
storage.set_schema(cad_schema)
# Extract features...
storage.compress_store()
return storage.get_file_path("")
2. Create and Execute Flow (Notebook)
from cad_tasks import gather_files, encode_data
import hoops_ai
flow = hoops_ai.create_flow(
name="cad_pipeline",
tasks=[gather_files, encode_data],
flows_outputdir="./output",
max_workers=8,
auto_dataset_export=True
)
output, summary, flow_file = flow.process(inputs={'sources': ["/path/to/cad"]})
3. Query and Use Dataset
from hoops_ai.dataset import DatasetExplorer
explorer = DatasetExplorer(flow_output_file=flow_file)
explorer.print_table_of_contents()
# Query files by condition
file_list = explorer.get_file_list(
group="faces",
where=lambda ds: ds['face_count'] > 100
)
API Reference
Task Decorators
@flowtask.extract
Purpose: Gather input data (files, database queries, etc.)
@flowtask.extract(
name="task_name", # Optional: defaults to function name
inputs=["sources"], # Keys from flow.process(inputs={...})
outputs=["files"], # Keys passed to next task
parallel_execution=True # Default: True
)
def gather_files(sources: List[str]) -> List[str]:
"""
Args:
sources: Input data from flow.process()
Returns:
List of items to process (e.g., file paths)
"""
return [...]
@flowtask.transform
Purpose: Process individual items (CAD encoding, feature extraction)
@flowtask.transform(
name="encode",
inputs=["cad_file", "cad_loader", "storage"], # Framework injects loader & storage
outputs=["encoded_path"],
parallel_execution=True
)
def encode_cad(cad_file: str, cad_loader, storage) -> str:
"""
Args:
cad_file: Single file from previous task output
cad_loader: HOOPSLoader instance (auto-injected)
storage: DataStorage instance (auto-injected)
Returns:
Path to encoded data file
"""
return "path/to/encoded.data"
@flowtask.custom
Purpose: Aggregation, validation, or custom logic
@flowtask.custom(
name="aggregate",
inputs=["encoded_files"],
outputs=["summary"],
parallel_execution=False # Typically sequential
)
def aggregate_results(encoded_files: List[str]) -> dict:
"""Custom processing logic"""
return {"summary": "..."}
Flow Configuration
hoops_ai.create_flow(
name: str, # Flow identifier
tasks: List[callable], # Decorated task functions
flows_outputdir: str, # Output directory
max_workers: int = None, # Parallel workers (None = auto-detect)
debug: bool = False, # True = sequential execution
auto_dataset_export: bool = True, # Auto-merge encoded data
ml_task: str = "" # Description for documentation
)
Returns: (FlowOutput, dict, str)
FlowOutput: Detailed execution resultsdict: Summary with keys:file_count,flow_data,flow_info,Duration [seconds]str: Path to.flowfile
Schema Builder API
from hoops_ai.storage.datasetstorage.schema_builder import SchemaBuilder
# Initialize schema
builder = SchemaBuilder(domain="MyDomain", version="1.0")
# Create data group
group = builder.create_group(
name="faces", # Group name
base_dimension="face", # Base dimension for arrays
description="Face data" # Documentation
)
# Add arrays to group
group.create_array(
name="areas", # Array name
dims=["face"], # Dimensions
dtype="float32", # Data type
description="Face areas" # Documentation
)
# Define metadata routing
builder.define_file_metadata("processing_time", "float32", "Processing time")
builder.define_categorical_metadata("category", "int32", "Part category")
# Build schema
schema = builder.build()
DatasetExplorer API
from hoops_ai.dataset import DatasetExplorer
explorer = DatasetExplorer(flow_output_file="path/to/flow.flow")
# View dataset structure
explorer.print_table_of_contents()
# Get available groups
groups = explorer.available_groups() # Returns: ['faces', 'edges', ...]
# Query files
file_list = explorer.get_file_list(
group="faces",
where=lambda ds: ds['face_count'] > 100
)
# Create distribution
dist = explorer.create_distribution(
key="category",
group="labels",
bins=10 # None = auto-bin
)
DatasetLoader API
from hoops_ai.dataset import DatasetLoader
loader = DatasetLoader(
merged_store_path="path/to/flow.dataset",
parquet_file_path="path/to/flow.infoset"
)
# Split dataset
train_size, val_size, test_size = loader.split(
key="category", # Column to stratify by
group="labels", # Group containing the key
train=0.7,
validation=0.15,
test=0.15,
random_state=42
)
# Get PyTorch dataset
train_dataset = loader.get_dataset("train")
loader.close_resources()
Common Usage Patterns
Pattern 1: CAD Encoding Pipeline
@flowtask.transform(name="encode", inputs=["cad_file", "cad_loader", "storage"])
def encode_cad(cad_file, cad_loader, storage):
# Load CAD file
cad_model = cad_loader.create_from_file(cad_file)
storage.set_schema(cad_schema)
# Extract features
from hoops_ai.cadencoder import BrepEncoder
brep_encoder = BrepEncoder(cad_model.get_brep(), storage)
brep_encoder.push_face_attributes()
# Save custom metadata
storage.save_metadata("face_count", cad_model.get_face_count())
# Finalize
storage.compress_store()
return storage.get_file_path("")
Pattern 2: Multi-Source Gathering
@flowtask.extract(name="gather", inputs=["sources"], outputs=["files"])
def gather_files(sources):
all_files = []
for source in sources:
all_files.extend(glob.glob(f"{source}/**/*.step", recursive=True))
return all_files
Pattern 3: Filtered Dataset Querying
# Complex query with lambda
high_complexity = lambda ds: (ds['face_count'] > 100) & (ds['category'] == 5)
file_list = explorer.get_file_list(group="labels", where=high_complexity)
# Visualize results
from hoops_ai.insights import DatasetViewer
viewer = DatasetViewer.from_explorer(explorer)
viewer.show_preview_as_image(file_list, grid_cols=5)
Output File Structure
flows_outputdir/flows/{flow_name}/
├── {flow_name}.flow # Main output: JSON with all metadata
├── {flow_name}.dataset # Merged Zarr dataset
├── {flow_name}.infoset # File-level metadata (Parquet)
├── {flow_name}.attribset # Categorical metadata (Parquet)
├── error_summary.json # Errors encountered during processing
├── flow_log.log # Detailed execution log
├── encoded/ # Individual .data files (Zarr format)
└── stream_cache/ # PNG previews for visualization
Windows ProcessPoolExecutor Requirements
Critical: On Windows, parallel execution uses separate processes (not threads). This requires:
Required Setup
Define tasks in .py files (e.g.,
cad_tasks.py)Set license at module level:
hoops_ai.set_license(os.getenv("HOOPS_AI_LICENSE"), validate=False)
Define schema at module level:
cad_schema = builder.build() # Global variable
Import tasks in notebook:
from cad_tasks import gather_files, encode_data
Will Fail
Defining tasks in notebook cells
Setting license only in notebook
Defining schema only in notebook
Debugging
Sequential Execution Mode
flow = hoops_ai.create_flow(..., debug=True) # Sequential, easier to debug
Check Execution Logs
import json
# View errors
with open("output/flows/my_flow/error_summary.json") as f:
errors = json.load(f)
for err in errors:
print(f"File: {err['file']}, Error: {err['message']}")
# View execution log
with open("output/flows/my_flow/flow_log.log") as f:
print(f.read())
Best Practices
Start Small: Test with 10-100 files before scaling
Use Schemas: Always define schemas for predictable data organization
Handle Errors Gracefully: Framework collects errors; inspect after execution
Monitor Resources: Check memory usage during large dataset processing
Version Control: Track schemas and task definitions in git
See Also
Data Flow Customisation - Complete Flow Management documentation