Orchestrator — Development Reference
This page contains development-focused code examples, implementation details, and configuration notes for the ETL Orchestrator used in ETL Jobs.
For architecture and business-level process descriptions, see the Processes section: Orchestrator (Process Overview)
Using the Orchestrator
Basic Example
from etl_lib.Orchestrator import Orchestrator
from etl_lib.job.JobContext import JobContext
from etl_lib.tasks.chains.athenaeum import (
AthenaeumRawTask,
CleanAthenaeumTask,
)
from etl_lib.tasks.processing.ProcessingTask import ProcessingTask
from etl_lib.tasks.reports.ReportsTask import ReportsTask
from etl_lib.data.BigQuery import BigQuery
# Initialize job context
job_context = JobContext(chain_id="athenaeum", partitions=16)
db_sink = BigQuery(job_context)
# Create orchestrator with tasks
orchestrator = Orchestrator(
job_context=job_context,
tasks=[
AthenaeumRawTask(job_context=job_context),
CleanAthenaeumTask(job_context=job_context),
ProcessingTask(job_context=job_context),
ReportsTask(job_context=job_context, database_sink=db_sink),
],
)
# Run the pipeline
orchestrator.run()With Model Configuration
orchestrator = Orchestrator(
job_context=job_context,
tasks=[
AthenaeumRawTask(job_context=job_context, skip_ingestion=False),
CleanAthenaeumTask(job_context=job_context),
ProcessingTask(job_context=job_context),
ReportsTask(job_context=job_context, database_sink=db_sink),
],
model_configs={
"RawReservationModel": {"database": "etl_gp_raw_athenaeum"},
"RawRevenueModel": {"database": "etl_gp_raw_athenaeum"},
},
)
orchestrator.run()Initialization Parameters (Development)
job_context— Job configuration and runtime environment. Required.tasks— Iterable ofTaskorTaskGroupinstances. Required.model_configs— Optional dictionary to override model-level constructor arguments (e.g., database, table).
Example:
model_configs={
"RawReservationModel": {
"database": "etl_gp_raw_athenaeum",
"table": "raw_reservations_v2",
},
"CleanGuestModel": {
"partition_by": ["property_id", "year"],
},
}Orchestrator Internals — How It Works
- Expand
TaskGroups into tasks - Create a single instance per model and register it in the model registry
- Build a dependency graph from
requires()andprovides()across tasks - Topological sort tasks, detecting circular references
- Iterate tasks: populate
task.inputsandtask.outputs, ensure models loaded, then calltask.run()
Error Handling & Edge Cases
- Circular dependencies raise
ValueErrorwith cycle details - Multiple tasks providing the same model raise
ValueErrorwith conflicting tasks - Missing providers for required models cause read-time failures
Chain-Specific Examples (Athenaeum)
Inserting a chain-specific task into ProcessingTask
from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask
from etl_lib.tasks.chains.athenaeum.AthenaeumLegacyMergeTask import AthenaeumLegacyMergeTask
processing_task = ProcessingTask(job_context=job_context)
processing_task.insert_subtask(
subtask=AthenaeumLegacyMergeTask(
job_context=job_context,
write_to_catalog=False,
),
before=GuestMatchingTask,
)This ensures legacy data gets merged before deduplication with the GuestMatchingTask.
Advanced API & Configuration Notes
- Tasks have
inputsandoutputsdictionaries keyed by model class. model_configsaccept model class names (strings) as keys.- Subtasks can be inserted into composite tasks like
ProcessingTaskwithinsert_subtask(). - Use
write_to_catalogand sink configuration to control whether models are persisted to the catalog/database.
Examples from Codebase
- See
etl_lib/Orchestrator.pyfor the source implementation and unit tests underetl_lib/tests/for practical cases.
Example Implementation (detailed task and model setup)
This sample shows how Models, Tasks, and Orchestrator might be implemented in the codebase. It comes from the high-level process docs but is kept here as a development reference.
class RawReservationsModel(Model):
...
class RawRevenuesModel(Model):
...
class CleanReservationModel(Model):
...
class CleanGuestsModel(Model):
...
class CleanRoomsModel(Model):
...
class ProcessedReservationsModel(Model):
...
class AthenaeumIngesterTask(Task):
...
@staticmethod
def requires():
return []
@staticmethod
def provides():
return [RawReservationsModel, RawRevenuesModel]
...
class AthenaeumCleanerTask(Task):
...
@staticmethod
def requires():
return [RawReservationsModel, RawRevenuesModel]
@staticmethod
def provides():
return [CleanGuestsModel, CleanReservationsModel, CleanRoomsModel]
...
class ReservationMetricsTask(Task):
...
@staticmethod
def requires():
return [CleanReservationsModel]
@staticmethod
def provides():
return [CleanReservationsModel]
...
job_context = JobContext(chain_id="athenaeum")
orchestrator = Orchestrator(
job_context=job_context,
tasks=[
AthenaeumIngesterTask(),
AthenaeumCleanerTask(),
ReservationMetricsTask()
]
)
orchestrator.run()Best Practices (Development)
- Keep tasks small & focused for clearer dependency graphs.
- Use
model_configssparingly; prefer defaults on models. - Always provide deterministic
requires()andprovides()for correct dependency graphs. - Add integration and unit tests to validate DAGs and model registry behavior.
Related Files & Links
- Processes Overview: /processes/orchestrator
- Athenaeum Workflow (Process Overview): /processes/orchestrator/workflows/athenaeum
- Orchestrator Implementation:
etl_lib/Orchestrator.py - Jobs & Runtime:
jobs/andetl_lib/job/
Last updated on