Athenaeum Workflow
The Athenaeum workflow is a complete end-to-end ETL pipeline that processes hotel data from the Athenaeum property management system. This workflow demonstrates how chain-specific tasks are orchestrated with common processing tasks to transform raw operational data into business intelligence reports.
Workflow Overview
The Athenaeum workflow follows a standard ETL pattern with four main stages:
- Raw Data Ingestion & Crawling - Extract data from source systems
- Data Cleaning - Transform and standardize raw data
- Processing - Apply business logic and enrich data
- Report Generation - Produce final analytics tables
Pipeline Architecture
┌─────────────────────┐
│ AthenaeumRawTask │ ← Ingestion + Crawling
└──────────┬──────────┘
│ RawReservationModel
│ RawRevenueModel
↓
┌─────────────────────┐
│ CleanAthenaeumTask │ ← Data Cleaning
└──────────┬──────────┘
│ CleanGuestModel
│ CleanReservationModel
│ CleanRoomModel
↓
┌─────────────────────┐
│ ProcessingTask │ ← Business Logic
│ (with LegacyMerge) │
└──────────┬──────────┘
│ ProcessedGuestModel
│ ProcessedReservationModel
│ ProcessedRoomModel
↓
┌─────────────────────┐
│ ReportsTask │ ← Analytics Reports
└─────────────────────┘Implementation Example
Stage 1: Raw Data Ingestion & Crawling
AthenaeumRawTask
The AthenaeumRawTask orchestrates two subtasks to extract data from the Athenaeum PMS:
Subtasks
- AthenaeumIngesterTask (optional) - Downloads raw data files from the PMS API
- AthenaeumCrawlerTask - Reads downloaded files and loads them into Spark DataFrames
Configuration Options
skip_ingestion - Skip the download step if data files already exist
sync_dates - Specific dates to sync (optional)
Models Provided
RawReservationModel- Raw reservation/booking dataRawRevenueModel- Raw revenue/charge data
Example
Stage 2: Data Cleaning
CleanAthenaeumTask
The CleanAthenaeumTask transforms raw Athenaeum data into standardized, cleaned models that conform to the common schema used across all hotel chains.
Data Transformations
Guest Data
- Extract and standardize guest names, emails, phone numbers
- Normalize address information
- Handle duplicate guest records
- Generate unique guest identifiers
Reservation Data
- Standardize booking status codes
- Parse and validate date ranges (check-in, check-out, booking date)
- Extract booking source and channel information
- Calculate booking type (room, event, etc.)
- Handle duplicate confirmations with tie-breaking logic
Room Data
- Parse daily room stay records
- Align revenue data with room stays by date
- Calculate room charges and taxes
- Determine stay day/night flags
- Assign room type codes
Models Required
RawReservationModelRawRevenueModel
Models Provided
CleanGuestModelCleanReservationModelCleanRoomModel
Stage 3: Processing
ProcessingTask with LegacyMerge
The ProcessingTask is a composite task that runs several processing subtasks in sequence. For Athenaeum, we insert a custom AthenaeumLegacyMergeTask to handle historical data migration.
Standard Processing Subtasks
- ReplaceValuesTask - Apply value replacements from configuration
- GuestMatchingCheckTask - Ensure all guests have cluster IDs
- GuestMatchingTask - Deduplicate guests using Splink record linkage
- ReservationMetricsTask - Calculate booking windows, cancellation windows
- RoomMetricsTask - Calculate length of stay, daily metrics
- GuestLoyaltyTask - Compute lifetime value, stay counts, loyalty tiers
AthenaeumLegacyMergeTask
This chain-specific task merges legacy Athenaeum data (pre-April 2025) from an older system into the processed models. It’s inserted before GuestMatchingTask so that legacy guests are included in the deduplication process.
What It Does:
- Loads legacy data from S3 historical tables
- Filters data to only include dates before April 1, 2025
- Aligns legacy schema to current processed schema
- Renames columns to match current conventions
- Handles legacy stay flags and booking types
- Deduplicates legacy records
- Unions legacy data with current processed data
Column Mappings:
athenaeum_room_stay_date_occupied→room_stay_date_is_stay_dayroom_type_code→room_typeathenaeum_agent_name→travel_agentathenaeum_company_name→companychannel→sourcesource→secondary_source
This ensures the processing pipeline becomes:
- ReplaceValuesTask
- GuestMatchingCheckTask
- AthenaeumLegacyMergeTask ← Custom insertion
- GuestMatchingTask
- ReservationMetricsTask
- RoomMetricsTask
- GuestLoyaltyTask
Models Required
CleanGuestModelCleanReservationModelCleanRoomModel
Models Provided
ProcessedGuestModelProcessedReservationModelProcessedRoomModel
Stage 4: Report Generation
ReportsTask
The ReportsTask generates final analytics tables and writes them to BigQuery for consumption by business intelligence tools.
Report Types
DailyReportTask - One row per booking per stay date
- Daily room revenue and charges
- Stay day/night flags
- Guest loyalty metrics
- Booking details
BookingRoomsReportTask - One row per booking (aggregated)
- Total revenue and charges per booking
- Total nights stayed
- Average daily rate (ADR)
- Booking summary metrics
GuestLoyaltyReportTask - One row per guest
- Lifetime value
- Total stays and nights
- Loyalty tier
- First/last stay dates
- Booking patterns
PickupReportTask - Pace and pickup analysis
- Compares current bookings to historical snapshots
- Shows booking velocity by date range
- Supports revenue management forecasting
Database Sink
Reports are written to BigQuery using the BigQuery sink.
The sink handles:
- Table creation and schema management
- Partition management by property and date
- Incremental vs. full refresh strategies
- Error handling and retries
Models Required
ProcessedGuestModelProcessedReservationModelProcessedRoomModel
Model Configuration
The Athenaeum workflow uses custom model configurations to specify database names for raw data models.
This tells the Orchestrator to read raw data from the etl_gp_raw_athenaeum database instead of the default location.
Execution Flow
When orchestrator.run() is called, the following sequence occurs:
-
Dependency Resolution
- Orchestrator analyzes all task dependencies
- Builds a topological sort of tasks
- Validates no circular dependencies exist
-
Task Execution (in order)
AthenaeumRawTask └─→ AthenaeumIngesterTask (if not skipped) └─→ AthenaeumCrawlerTask │ Outputs: RawReservationModel, RawRevenueModel ↓ CleanAthenaeumTask │ Reads: RawReservationModel, RawRevenueModel │ Outputs: CleanGuestModel, CleanReservationModel, CleanRoomModel ↓ ProcessingTask ├─→ ReplaceValuesTask ├─→ GuestMatchingCheckTask ├─→ AthenaeumLegacyMergeTask (custom) ├─→ GuestMatchingTask ├─→ ReservationMetricsTask ├─→ RoomMetricsTask └─→ GuestLoyaltyTask │ Reads: CleanGuestModel, CleanReservationModel, CleanRoomModel │ Outputs: ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel ↓ ReportsTask ├─→ DailyReportTask ├─→ BookingRoomsReportTask ├─→ GuestLoyaltyReportTask └─→ PickupReportTask │ Reads: ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel │ Outputs: Reports written to BigQuery -
Model Lifecycle
- Models are instantiated once in the model registry
- Each task reads from
task.inputs[ModelClass] - Each task writes to
task.outputs[ModelClass] - Models persist in memory between tasks
- Final models are written to catalog/database
Customization Options
Skip Ingestion
If raw data files already exist, skip the download step — use skip_ingestion as a flag during configuration.
Sync Specific Dates
Only process specific dates via sync_dates configuration.
Disable Legacy Merge
Remove the legacy merge task if not needed.
Custom Report Selection
Run only specific reports using the TaskGroup and only_run_reports/skip_reports flags.