Skip to Content
ProcessesOrchestratorOverview

Orchestrator

The Orchestrator is the central coordination engine of the ETL pipeline. It manages task execution, resolves dependencies between models, handles task ordering through topological sorting, and ensures data flows correctly from raw sources through processing to final reports.

Overview

The Orchestrator takes a collection of tasks and automatically determines their optimal execution order based on the models they require and provide. It maintains a model registry to track all data models used throughout the pipeline and ensures each task receives the correct input models and outputs to the right destination models.

Key Features

Automatic Dependency Resolution

The Orchestrator analyzes each task’s requires() and provides() methods to build a complete dependency graph. Tasks that provide models other tasks need are automatically scheduled to run first.

Topological Sorting

Tasks are executed in topologically sorted order, ensuring all dependencies are satisfied before a task runs. This prevents circular dependencies and ensures efficient data flow through the pipeline.

Model Registry

A centralized registry manages all model instances throughout the pipeline lifecycle. Models are instantiated once and shared between tasks, enabling efficient memory usage and data persistence.

Flexible Configuration

Model configurations can be customized through the model_configs parameter, allowing per-model settings like database names, table names, or processing options.

Initialization Parameters

job_context

Type: JobContext
Required: Yes

The job context containing configuration and runtime information for the entire pipeline.

tasks

Type: Iterable[Union[Task, TaskGroup]]
Required: Yes

A collection of tasks or task groups to execute. The Orchestrator will expand TaskGroups and automatically resolve the run order.

model_configs

Type: Optional[Dict[str, Dict]]
Required: No
Default: None

Optional configuration dictionary for individual models. Keys are model class names (as strings), and values are dictionaries of keyword arguments passed to the model constructor.

How It Works

1. Task Expansion

When initialized, the Orchestrator expands TaskGroup instances into their constituent tasks, creating a flat list of all tasks to execute.

2. Model Registry Creation

The Orchestrator collects all model classes from the requires() and provides() methods of all tasks. Each unique model class is instantiated once with its configuration and stored in the model registry.

3. Dependency Graph Construction

For each task, the Orchestrator builds a dependency graph by mapping which tasks provide which models and which tasks require which models. This creates a directed acyclic graph (DAG) of task dependencies.

4. Topological Sort

Tasks are sorted topologically based on their dependencies. This ensures that:

  • Tasks with no dependencies run first
  • Tasks only run after all their required models have been provided by upstream tasks
  • The original task order is preserved when there are no dependency constraints

5. Execution

The Orchestrator iterates through the sorted tasks and invokes each task in order, ensuring model inputs are prepared and outputs persist as designed.

Error Handling

Circular dependencies, multiple-providers for the same model, and missing dependencies are all detected during orchestration.

Chain-Specific Orchestrated Workflows

Different hotel chains have their own orchestrated workflows that compose chain-specific tasks with common processing tasks:

Best Practices

Order Tasks Logically

While the Orchestrator will automatically sort tasks by dependencies, providing tasks in a logical order (raw → clean → process → report) makes the code more readable and maintainable.

Use Model Configs Sparingly

Only use model_configs for settings that vary between environments or chains. Default values should be defined in the model class itself.

Single Provider Per Model

Ensure each model is provided by exactly one task. If multiple tasks could provide the same model, consider creating separate model classes or combining the tasks.

Task Granularity

Break complex operations into separate tasks rather than creating monolithic tasks. This improves reusability, testability, and makes the dependency graph clearer.

Monitor Execution

Use logging within tasks to track progress through the pipeline. The Orchestrator itself logs task execution order and timing information.

Last updated on