Orchestrator
The Orchestrator is the central coordination engine of the ETL pipeline. It manages task execution, resolves dependencies between models, handles task ordering through topological sorting, and ensures data flows correctly from raw sources through processing to final reports.
Overview
The Orchestrator takes a collection of tasks and automatically determines their optimal execution order based on the models they require and provide. It maintains a model registry to track all data models used throughout the pipeline and ensures each task receives the correct input models and outputs to the right destination models.
Key Features
Automatic Dependency Resolution
The Orchestrator analyzes each task’s requires() and provides() methods to build a complete dependency graph. Tasks that provide models other tasks need are automatically scheduled to run first.
Topological Sorting
Tasks are executed in topologically sorted order, ensuring all dependencies are satisfied before a task runs. This prevents circular dependencies and ensures efficient data flow through the pipeline.
Model Registry
A centralized registry manages all model instances throughout the pipeline lifecycle. Models are instantiated once and shared between tasks, enabling efficient memory usage and data persistence.
Flexible Configuration
Model configurations can be customized through the model_configs parameter, allowing per-model settings like database names, table names, or processing options.
Initialization Parameters
job_context
Type: JobContext
Required: Yes
The job context containing configuration and runtime information for the entire pipeline.
tasks
Type: Iterable[Union[Task, TaskGroup]]
Required: Yes
A collection of tasks or task groups to execute. The Orchestrator will expand TaskGroups and automatically resolve the run order.
model_configs
Type: Optional[Dict[str, Dict]]
Required: No
Default: None
Optional configuration dictionary for individual models. Keys are model class names (as strings), and values are dictionaries of keyword arguments passed to the model constructor.
How It Works
1. Task Expansion
When initialized, the Orchestrator expands TaskGroup instances into their constituent tasks, creating a flat list of all tasks to execute.
2. Model Registry Creation
The Orchestrator collects all model classes from the requires() and provides() methods of all tasks. Each unique model class is instantiated once with its configuration and stored in the model registry.
3. Dependency Graph Construction
For each task, the Orchestrator builds a dependency graph by mapping which tasks provide which models and which tasks require which models. This creates a directed acyclic graph (DAG) of task dependencies.
4. Topological Sort
Tasks are sorted topologically based on their dependencies. This ensures that:
- Tasks with no dependencies run first
- Tasks only run after all their required models have been provided by upstream tasks
- The original task order is preserved when there are no dependency constraints
5. Execution
The Orchestrator iterates through the sorted tasks and invokes each task in order, ensuring model inputs are prepared and outputs persist as designed.
Error Handling
Circular dependencies, multiple-providers for the same model, and missing dependencies are all detected during orchestration.
Chain-Specific Orchestrated Workflows
Different hotel chains have their own orchestrated workflows that compose chain-specific tasks with common processing tasks:
- Athenaeum Workflow - Complete pipeline for Athenaeum hotel data
Best Practices
Order Tasks Logically
While the Orchestrator will automatically sort tasks by dependencies, providing tasks in a logical order (raw → clean → process → report) makes the code more readable and maintainable.
Use Model Configs Sparingly
Only use model_configs for settings that vary between environments or chains. Default values should be defined in the model class itself.
Single Provider Per Model
Ensure each model is provided by exactly one task. If multiple tasks could provide the same model, consider creating separate model classes or combining the tasks.
Task Granularity
Break complex operations into separate tasks rather than creating monolithic tasks. This improves reusability, testability, and makes the dependency graph clearer.
Monitor Execution
Use logging within tasks to track progress through the pipeline. The Orchestrator itself logs task execution order and timing information.