[1]:
from IPython.display import Markdown, display
import inspect
import pathlib
import ast
def display_task_source(task_func, title):
"""Display task source code from the original file, including decorators but excluding docstrings.
Only shows code from @flowtask decorator to end of function, excluding the function's docstring.
"""
# Get the module where the function is defined
module = inspect.getmodule(task_func)
func_name = task_func.__name__
if not module or not hasattr(module, '__file__'):
display(Markdown(f"β Could not find source file for `{func_name}`"))
return
# Read the entire source file
source_file = pathlib.Path(module.__file__)
with open(source_file, 'r', encoding='utf-8') as f:
file_content = f.read()
# Parse the AST to find the function
tree = ast.parse(file_content)
# Find the function definition in the AST
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name == func_name:
# Get line numbers (1-indexed)
start_line = node.lineno
end_line = node.end_lineno
# Read the lines
lines = file_content.splitlines()
# Look backwards from function def to find @flowtask decorator
decorator_start = start_line - 1 # Convert to 0-indexed
while decorator_start > 0:
line = lines[decorator_start - 1].strip()
if line.startswith('@flowtask'):
break
decorator_start -= 1
# Extract from decorator to end of function
source_lines = lines[decorator_start - 1:end_line]
# Now remove the docstring if present
# The docstring is the first statement in the function body
if node.body and isinstance(node.body[0], ast.Expr) and isinstance(node.body[0].value, ast.Constant):
if isinstance(node.body[0].value.value, str):
# Found a docstring - get its line range
docstring_start = node.body[0].lineno - 1 # Convert to 0-indexed
docstring_end = node.body[0].end_lineno - 1 # Convert to 0-indexed
# Calculate relative positions in source_lines array
relative_doc_start = docstring_start - (decorator_start - 1)
relative_doc_end = docstring_end - (decorator_start - 1)
# Remove docstring lines from source_lines
source_lines = source_lines[:relative_doc_start] + source_lines[relative_doc_end + 1:]
source_code = '\n'.join(source_lines)
# Display as markdown
markdown_content = f"""
### π Task Source Code: `{func_name}`
```python
{source_code}
```
"""
display(Markdown(markdown_content))
return
# Fallback if AST parsing fails
display(Markdown(f"β Could not parse function `{func_name}` from source file"))
HOOPS AI - Minimal ETL Demoο
This notebook demonstrates the core features of the HOOPS AI data engineering workflows:
Key Componentsο
Schema-Based Dataset Organization: Define structured data schemas for consistent data merging
Parallel Task Decorators: Simplify CAD processing with type-safe task definitions
Generic Flow Orchestration: Automatically handle task dependencies and data flow
Automatic Dataset Merging: Process multiple files into a unified dataset structure
Integrated Exploration Tools: Analyze and prepare data for ML workflows
The framework automatically generates visualization assets and stream cache data to support downstream applications.
[2]:
import hoops_ai
import os
# Note: License is also set in cad_tasks.py for worker processes
# This is only for the parent process (optional but good practice)
hoops_ai.set_license(hoops_ai.use_test_license(), validate=False)
βΉοΈ Using TEST LICENSE (expires December 8, 2025 - 37 days remaining)
For production use, obtain your own license from Tech Soft 3D
Import Dependenciesο
The HOOPS AI framework provides several key modules:
flowmanager: Core orchestration engine with task decoratorscadaccess: CAD file loading and model access utilitiesstorage: Data persistence and retrieval componentsdataset: Tools for exploring and preparing merged datasets
[3]:
import os
import pathlib
from typing import Tuple, List
# Import the flow builder framework from the library
import hoops_ai
from hoops_ai.flowmanager import flowtask
from hoops_ai.cadaccess import HOOPSLoader, HOOPSTools
from hoops_ai.cadencoder import BrepEncoder
from hoops_ai.dataset import DatasetExplorer
from hoops_ai.storage import DataStorage, CADFileRetriever, LocalStorageProvider
from hoops_ai.storage.datasetstorage.schema_builder import SchemaBuilder
Configuration Setupο
Define input and output paths for CAD processing:
Input directory containing source CAD files
Output directory for processed results
Source directory with specific CAD file formats
The framework will automatically organize outputs into structured directories.
[4]:
# Configuration - Using simpler paths
nb_dir = pathlib.Path.cwd()
datasources_dir = nb_dir.parent.joinpath("packages","cadfiles","cadsynth100","step")
if not datasources_dir.exists():
print("Data source directory does not exist. Please check the path.")
exit(-1)
flows_outputdir = nb_dir.joinpath("out")
Schema Definition - The Foundation of Dataset Organizationο
The SchemaBuilder defines a structured blueprint for how CAD data should be organized:
Domain & Version: Namespace and versioning for schema tracking
Groups: Logical data categories (e.g., βmachiningβ, βfacesβ, βedgesβ)
Arrays: Typed data containers with defined dimensions
Metadata Routing: Rules for routing metadata to appropriate storage
Schemas ensure consistent data organization across all processed files, enabling automatic merging and exploration.
[5]:
# Schema is now defined in cad_tasks.py for ProcessPoolExecutor compatibility
# Import it from there to view or customize
from cad_tasks import cad_schema
print(cad_schema)
{'version': '1.0', 'domain': 'Manufacturing_Analysis', 'groups': {'machining': {'primary_dimension': 'part', 'arrays': {'machining_category': {'dims': ['part'], 'dtype': 'int32', 'description': 'Machining complexity category (1-5)'}, 'material_type': {'dims': ['part'], 'dtype': 'int32', 'description': 'Material type (1-5)'}, 'estimated_machining_time': {'dims': ['part'], 'dtype': 'float32', 'description': 'Estimated machining time in hours'}}, 'description': 'Manufacturing and machining classification data'}}, 'description': 'Minimal schema for manufacturing classification', 'metadata': {'metadata': {'file_level': {}, 'categorical': {'material_type_description': {'dtype': 'str', 'required': False, 'description': 'Material classification'}}, 'routing_rules': {'file_level_patterns': [], 'categorical_patterns': ['material_type_description', 'category', 'type'], 'default_numeric': 'file_level', 'default_categorical': 'categorical', 'default_string': 'categorical'}}}}
[6]:
# Import task functions from external module for ProcessPoolExecutor compatibility
from cad_tasks import gather_files, encode_manufacturing_data
[7]:
display_task_source(gather_files, "gather_files")
π Task Source Code: gather_filesο
@flowtask.extract(
name="gather cad files",
inputs=["cad_datasources"],
outputs=["cad_dataset"],
parallel_execution=True
)
def gather_files(source: str) -> List[str]:
# Use simple glob pattern matching for ProcessPoolExecutor compatibility
patterns = ["*.stp", "*.step", "*.iges", "*.igs"]
source_files = []
for pattern in patterns:
search_path = os.path.join(source, pattern)
files = glob.glob(search_path)
source_files.extend(files)
print(f"Found {len(source_files)} CAD files in {source}")
return source_files
[8]:
display_task_source(encode_manufacturing_data, "encode_manufacturing_data")
π Task Source Code: encode_manufacturing_dataο
@flowtask.transform(
name="Manufacturing data encoding",
inputs=["cad_dataset"],
outputs=["cad_files_encoded"],
parallel_execution=True
)
def encode_manufacturing_data(cad_file: str, cad_loader: HOOPSLoader, storage: DataStorage) -> str:
# Load CAD model using the process-local HOOPSLoader
cad_model = cad_loader.create_from_file(cad_file)
# Set the schema for structured data organization
# Schema is defined at module level, so it's available in all worker processes
storage.set_schema(cad_schema)
# Prepare BREP for feature extraction
hoopstools = HOOPSTools()
hoopstools.adapt_brep(cad_model, None)
# Extract geometric features using BrepEncoder
brep_encoder = BrepEncoder(cad_model.get_brep(), storage)
brep_encoder.push_face_indices()
brep_encoder.push_face_attributes()
# Generate manufacturing classification data
file_basename = os.path.basename(cad_file)
file_name = os.path.splitext(file_basename)[0]
# Set seed for reproducible results based on filename
random.seed(hash(file_basename) % 1000)
# Generate classification values
machining_category = random.randint(1, 5)
material_type = random.randint(1, 5)
estimated_time = random.uniform(0.5, 10.0)
# Material type descriptions
material_descriptions = ["Steel", "Aluminum", "Titanium", "Plastic", "Composite"]
# Save data using the OptStorage API (data_key format: "group/array_name")
storage.save_data("machining/machining_category", np.array([machining_category], dtype=np.int32))
storage.save_data("machining/material_type", np.array([material_type], dtype=np.int32))
storage.save_data("machining/estimated_machining_time", np.array([estimated_time], dtype=np.float32))
# Save categorical metadata (will be routed to .attribset)
storage.save_metadata("material_type_description", material_descriptions[material_type - 1])
# Save file-level metadata (will be routed to .infoset)
storage.save_metadata("Item", str(cad_file))
storage.save_metadata("Flow name", "minimal_manufacturing_flow")
# Compress the storage into a .data file
storage.compress_store()
return storage.get_file_path("")
Flow Orchestration and Automatic Dataset Generationο
The hoops_ai.create_flow() function orchestrates the data flow execution. The tasks parameters can receive any function defined by the user. This is fully editable, you can write your OWN encoding logic.
[9]:
# Create and run the Data Flow
flow_name = "minimal_manufacturing_flow"
cad_flow = hoops_ai.create_flow(
name=flow_name,
tasks=[gather_files, encode_manufacturing_data], # Imported from cad_tasks.py
max_workers=12, # Process-based parallel execution now works!
flows_outputdir=str(flows_outputdir),
ml_task="Manufacturing Classification Demo",
auto_dataset_export=True, # Enable automatic dataset merging
debug=False # Changed to False to enable parallel execution
)
# Run the flow to process all files
print("Starting flow execution with ProcessPoolExecutor...")
print("β Schema is defined in cad_tasks.py, available to all worker processes")
flow_output, output_dict, flow_file = cad_flow.process(inputs={'cad_datasources': [str(datasources_dir)]})
# Display results
print("\n" + "="*70)
print("FLOW EXECUTION COMPLETED SUCCESSFULLY")
print("="*70)
print(f"\nDataset files created:")
print(f" Main dataset: {output_dict.get('flow_data', 'N/A')}")
print(f" Info dataset: {output_dict.get('flow_info', 'N/A')}")
print(f" Attributes: {output_dict.get('flow_attributes', 'N/A')}")
print(f" Flow file: {flow_file}")
print(f"\nTotal processing time: {output_dict.get('Duration [seconds]', {}).get('total', 0):.2f} seconds")
print(f"Files processed: {output_dict.get('file_count', 0)}")
Starting flow execution with ProcessPoolExecutor...
β Schema is defined in cad_tasks.py, available to all worker processes
|INFO| FLOW | ######### Flow 'minimal_manufacturing_flow' start #######
|WARNING| FLOW | Cleaning up existing flow directory: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out\flows\minimal_manufacturing_flow
|WARNING| FLOW | Removing all previous outputs for flow 'minimal_manufacturing_flow' to avoid build conflicts.
|INFO| FLOW | Flow directory successfully cleaned and recreated: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out\flows\minimal_manufacturing_flow
|INFO| FLOW |
Flow Execution Summary
|INFO| FLOW | ==================================================
|INFO| FLOW | Task 1: gather cad files
|INFO| FLOW | Inputs : cad_datasources
|INFO| FLOW | Outputs: cad_dataset
|INFO| FLOW | Task 2: Manufacturing data encoding
|INFO| FLOW | Inputs : cad_dataset
|INFO| FLOW | Outputs: cad_files_encoded
|INFO| FLOW | Task 3: AutoDatasetExportTask
|INFO| FLOW | Inputs : cad_files_encoded
|INFO| FLOW | Outputs: encoded_dataset, encoded_dataset_info, encoded_dataset_attribs
|INFO| FLOW |
Task Dependencies:
|INFO| FLOW | gather cad files has no dependencies.
|INFO| FLOW | gather cad files --> Manufacturing data encoding
|INFO| FLOW | Manufacturing data encoding --> AutoDatasetExportTask
|INFO| FLOW | ==================================================
|INFO| FLOW | Executing ParallelTask 'gather cad files' with 1 items.
|INFO| FLOW | Executing ParallelTask 'Manufacturing data encoding' with 101 items.
|INFO| FLOW | Executing SequentialTask 'AutoDatasetExportTask'.
[DatasetMerger] Saved schema with 2 groups to metadata.json
|INFO| FLOW | Auto dataset export completed in 35.81 seconds
Sequential Task end=====================
|INFO| FLOW | Time taken: 217.68 seconds
|INFO| FLOW | ######### Flow 'minimal_manufacturing_flow' end ######
======================================================================
FLOW EXECUTION COMPLETED SUCCESSFULLY
======================================================================
Dataset files created:
Main dataset: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out\flows\minimal_manufacturing_flow\minimal_manufacturing_flow.dataset
Info dataset: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out\flows\minimal_manufacturing_flow\minimal_manufacturing_flow.infoset
Attributes: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out\flows\minimal_manufacturing_flow\minimal_manufacturing_flow.attribset
Flow file: C:\Users\LuisSalazar\Documents\MAIN\MLProject\repo\HOOPS-AI-tutorials\notebooks\out/flows/minimal_manufacturing_flow/minimal_manufacturing_flow.flow
Total processing time: 217.68 seconds
Files processed: 101