hoops_ai.flowmanager

Quick Overview

Classes

Flow(name, specifications[, tasks])

Class to manage and orchestrate the execution of multiple Tasks.

FlowOutput(flow_spec, task_instances, ...)

Class to encapsulate the results, errors, and logs from the Flow execution.

ParallelExecutor(max_workers)

Simplified parallel executor configuration

TaskDecorators()

ETL-based task decorators for flow orchestration.

Functions

create_flow(name, tasks, flows_outputdir[, ...])

Module-level flow creation function with simplified parameters.

list_registered_tasks()

Returns a dictionary of all registered tasks.

print_task_registry()

Prints information about all registered tasks.

validate_tasks()

Validates all registered tasks and returns validation results.

Data Flow Management Module

The FlowManager module provides a powerful framework for orchestrating complex CAD data processing workflows. It enables the creation of scalable, parallelizable pipelines that can handle everything from single CAD file processing to massive batch operations across entire datasets.

This module implements a task-based architecture where complex workflows are decomposed into manageable, reusable components. It supports both sequential and parallel execution patterns, with built-in monitoring, error handling, and progress tracking capabilities.

KEY FEATURES:

  1. ETL-Based Task Decorators:

    • @flowtask.extract: Data extraction phase (e.g., gathering CAD files)

    • @flowtask.transform: Data transformation phase (e.g., encoding CAD data)

    • @flowtask.compute_embeddings: Compute shape embeddings from CAD files

    • @flowtask.custom: Flexible custom tasks for any processing

  2. Simplified Flow Creation:

    • create_flow(): Create flows with direct parameters

    • Simple max_workers parameter for parallel execution

    • Automatic dataset export integration

  3. Automatic Function Injection:

    • Decorators inject user functions into task execution framework

    • No need to implement new ParallelTask classes

    • Task names are user-defined, not hard-coded

  4. Execution Mode Control:

    • parallel_execution parameter on decorators for explicit control

    • Automatic sequential mode when max_workers <= 1

    • Per-task execution mode override

USAGE EXAMPLE:

from hoops_ai.flowmanager import flowtask
import hoops_ai

@flowtask.extract(
    name="gather_cad_files",
    inputs=["cad_datasources"],
    outputs=["cad_dataset"],
    parallel_execution=True
)
def my_gather_function(source: str) -> List[str]:
    return glob.glob(f"{source}/*.step")

@flowtask.transform(
    name="encode_cad",
    inputs=["cad_file", "cad_loader", "storage"],
    outputs=["face_count", "edge_count"],
    parallel_execution=True
)
def my_encode_function(cad_file, cad_loader, storage):
    # transformation logic
    return face_count, edge_count

# Create flow with simplified parameters
my_flow = hoops_ai.create_flow(
    name="my_flow",
    tasks=[my_gather_function, my_encode_function],
    flows_outputdir="./output",
    max_workers=4,
    debug=False
)

# Execute flow
flow_output = my_flow.process(inputs={"cad_datasources": ["/path/to/cad"]})
print(flow_output.summary())

For workflow design patterns and best practices, see the Data Flow Management Programming Guide.