Skip to Content
DevelopmentETL JobsOrchestrator (Dev Reference)Orchestrator (Development Reference)

Orchestrator — Development Reference

This page contains development-focused code examples, implementation details, and configuration notes for the ETL Orchestrator used in ETL Jobs.

For architecture and business-level process descriptions, see the Processes section: Orchestrator (Process Overview)

Using the Orchestrator

Basic Example

from etl_lib.Orchestrator import Orchestrator from etl_lib.job.JobContext import JobContext from etl_lib.tasks.chains.athenaeum import ( AthenaeumRawTask, CleanAthenaeumTask, ) from etl_lib.tasks.processing.ProcessingTask import ProcessingTask from etl_lib.tasks.reports.ReportsTask import ReportsTask from etl_lib.data.BigQuery import BigQuery # Initialize job context job_context = JobContext(chain_id="athenaeum", partitions=16) db_sink = BigQuery(job_context) # Create orchestrator with tasks orchestrator = Orchestrator( job_context=job_context, tasks=[ AthenaeumRawTask(job_context=job_context), CleanAthenaeumTask(job_context=job_context), ProcessingTask(job_context=job_context), ReportsTask(job_context=job_context, database_sink=db_sink), ], ) # Run the pipeline orchestrator.run()

With Model Configuration

orchestrator = Orchestrator( job_context=job_context, tasks=[ AthenaeumRawTask(job_context=job_context, skip_ingestion=False), CleanAthenaeumTask(job_context=job_context), ProcessingTask(job_context=job_context), ReportsTask(job_context=job_context, database_sink=db_sink), ], model_configs={ "RawReservationModel": {"database": "etl_gp_raw_athenaeum"}, "RawRevenueModel": {"database": "etl_gp_raw_athenaeum"}, }, ) orchestrator.run()

Initialization Parameters (Development)

  • job_context — Job configuration and runtime environment. Required.
  • tasks — Iterable of Task or TaskGroup instances. Required.
  • model_configs — Optional dictionary to override model-level constructor arguments (e.g., database, table).

Example:

model_configs={ "RawReservationModel": { "database": "etl_gp_raw_athenaeum", "table": "raw_reservations_v2", }, "CleanGuestModel": { "partition_by": ["property_id", "year"], }, }

Orchestrator Internals — How It Works

  • Expand TaskGroups into tasks
  • Create a single instance per model and register it in the model registry
  • Build a dependency graph from requires() and provides() across tasks
  • Topological sort tasks, detecting circular references
  • Iterate tasks: populate task.inputs and task.outputs, ensure models loaded, then call task.run()

Error Handling & Edge Cases

  • Circular dependencies raise ValueError with cycle details
  • Multiple tasks providing the same model raise ValueError with conflicting tasks
  • Missing providers for required models cause read-time failures

Chain-Specific Examples (Athenaeum)

Inserting a chain-specific task into ProcessingTask

from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask from etl_lib.tasks.chains.athenaeum.AthenaeumLegacyMergeTask import AthenaeumLegacyMergeTask processing_task = ProcessingTask(job_context=job_context) processing_task.insert_subtask( subtask=AthenaeumLegacyMergeTask( job_context=job_context, write_to_catalog=False, ), before=GuestMatchingTask, )

This ensures legacy data gets merged before deduplication with the GuestMatchingTask.

Advanced API & Configuration Notes

  • Tasks have inputs and outputs dictionaries keyed by model class.
  • model_configs accept model class names (strings) as keys.
  • Subtasks can be inserted into composite tasks like ProcessingTask with insert_subtask().
  • Use write_to_catalog and sink configuration to control whether models are persisted to the catalog/database.

Examples from Codebase

  • See etl_lib/Orchestrator.py for the source implementation and unit tests under etl_lib/tests/ for practical cases.

Example Implementation (detailed task and model setup)

This sample shows how Models, Tasks, and Orchestrator might be implemented in the codebase. It comes from the high-level process docs but is kept here as a development reference.

class RawReservationsModel(Model): ... class RawRevenuesModel(Model): ... class CleanReservationModel(Model): ... class CleanGuestsModel(Model): ... class CleanRoomsModel(Model): ... class ProcessedReservationsModel(Model): ... class AthenaeumIngesterTask(Task): ... @staticmethod def requires(): return [] @staticmethod def provides(): return [RawReservationsModel, RawRevenuesModel] ... class AthenaeumCleanerTask(Task): ... @staticmethod def requires(): return [RawReservationsModel, RawRevenuesModel] @staticmethod def provides(): return [CleanGuestsModel, CleanReservationsModel, CleanRoomsModel] ... class ReservationMetricsTask(Task): ... @staticmethod def requires(): return [CleanReservationsModel] @staticmethod def provides(): return [CleanReservationsModel] ... job_context = JobContext(chain_id="athenaeum") orchestrator = Orchestrator( job_context=job_context, tasks=[ AthenaeumIngesterTask(), AthenaeumCleanerTask(), ReservationMetricsTask() ] ) orchestrator.run()

Best Practices (Development)

  • Keep tasks small & focused for clearer dependency graphs.
  • Use model_configs sparingly; prefer defaults on models.
  • Always provide deterministic requires() and provides() for correct dependency graphs.
  • Add integration and unit tests to validate DAGs and model registry behavior.
Last updated on