Cheval Collection Workflow
The Cheval Collection workflow processes hotel data for Cheval Collection properties using the Opera PMS integration. This workflow demonstrates how specific Opera chain tasks (legacy merge and CMTP fixes) are inserted into the common processing pipeline.
Workflow Overview
This workflow follows the same general ETL pattern as other chains:
- Raw Data Ingestion & Crawling - Extract data from Opera source
- Data Cleaning - Transform and standardize raw Opera data
- Processing - Apply chain-specific logic (legacy data merge, CMTP fix, guest matching)
- Report Generation - Produce final analytics tables
Pipeline Architecture
┌─────────────────────┐
│ OperaRawTask │ ← Ingestion + Crawling
└──────────┬──────────┘
│ RawReservationModel
│ RawRateModel
│ RawProfileModel
↓
┌─────────────────────┐
│ CleanOperaTask │ ← Data Cleaning
└──────────┬──────────┘
│ CleanGuestModel
│ CleanReservationModel
│ CleanRoomModel
↓
┌─────────────────────┐
│ ProcessingTask │ ← Business Logic
│ (with LegacyMerge, │
│ and CMTPFix) │
└──────────┬──────────┘
│ ProcessedGuestModel
│ ProcessedReservationModel
│ ProcessedRoomModel
↓
┌─────────────────────┐
│ ReportsTask │ ← Analytics Reports
└─────────────────────┘Stage 1: Raw Data Ingestion & Crawling
OperaRawTask
The OperaRawTask orchestrates the Opera ingester and crawler subtasks:
OperaIngesterTask(optional) - Downloads raw data from Opera APIOperaCrawlerTask- Parses downloaded files and creates raw data models
Models Produced
RawReservationModel- Raw reservationsRawRateModel- Raw rate/daily revenue detailsRawProfileModel- Raw guest profiles
Stage 2: Data Cleaning
CleanOperaTask
The CleanOperaTask transforms raw Opera data to standard cleaned models used across the ETL pipeline. This includes:
- Standardizing reservation statuses and IDs
- Generating stable UUIDs for reservations and guests
- Parsing check-in/out dates and per-stay daily rows
- Matching rates to room stay dates
- Creating normalized guest and room records
Models Produced
CleanGuestModelCleanReservationModelCleanRoomModel
Stage 3: Processing
ProcessingTask with OperaLegacyMerge & CMTPFix
The ProcessingTask runs common processing subtasks. For Cheval Collection we insert two Opera-specific subtasks into the processing pipeline:
- OperaLegacyMergeTask — Merges historical (legacy) data into current processed models and ensures consistent schema alignment.
- CMTPFixTask — Applies a targeted workaround for specific CTMP cancelled booking issues at Cheval Maison The Palm Dubai.
In jobs/chevalcollection_task.py the job adds these subtasks and disables GuestMatchingTask:
processing_task = ProcessingTask(job_context=job_context)
processing_task.insert_subtask(
subtask=OperaLegacyMergeTask(job_context=job_context, write_to_catalog=False),
before=GuestMatchingTask,
)
processing_task.insert_subtask(
subtask=CMTPFixTask(job_context=job_context, write_to_catalog=False),
after=OperaLegacyMergeTask,
)
processing_task.skip_task(GuestMatchingTask)Why these tasks?
-
OperaLegacyMergeTaskaligns historical Opera data with the current processed models, handling partial overlaps and adjusting booking dates accounting for acutoff_dateper property. -
CMTPFixTaskapplies property/reservation-specific marketplace fixes (status updates, date truncations, currency conversions) to correct known issues for production reports.
Stage 4: Report Generation
ReportsTask
The ReportsTask writes final aggregates and reporting tables to BigQuery. Reports include daily rows, aggregate booking metrics, guest loyalty tables, and pickup/pace analysis.
Model Configuration
Example model_configs from the job:
model_configs={
"RawReservationModel": {"database": "etl_gp_raw_chevalcollection"},
"RawRateModel": {"database": "etl_gp_raw_chevalcollection"},
"RawProfileModel": {"database": "etl_gp_raw_chevalcollection"},
}Orchestrator Example
Here’s an example jobs/chevalcollection_task.py showing how to assemble the pipeline (raw → clean → processing → reports) and insert Opera-specific subtasks:
from etl_lib.Orchestrator import Orchestrator
from etl_lib.data.BigQuery import BigQuery
from etl_lib.job.JobContext import JobContext
from etl_lib.tasks.chains.opera import (
OperaRawTask,
CleanOperaTask,
OperaLegacyMergeTask,
CMTPFixTask,
)
from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask
from etl_lib.tasks.processing.ProcessingTask import ProcessingTask
from etl_lib.tasks.reports.ReportsTask import ReportsTask
job_context = JobContext(chain_id="chevalcollection", partitions=16)
db_sink = BigQuery(job_context)
processing_task = ProcessingTask(job_context=job_context)
processing_task.insert_subtask(
subtask=OperaLegacyMergeTask(job_context=job_context, write_to_catalog=False),
before=GuestMatchingTask,
)
processing_task.insert_subtask(
subtask=CMTPFixTask(job_context=job_context, write_to_catalog=False),
after=OperaLegacyMergeTask,
)
processing_task.skip_task(GuestMatchingTask)
orchestrator = Orchestrator(
job_context=job_context,
tasks=[
OperaRawTask(job_context=job_context, skip_ingestion=False),
CleanOperaTask(job_context=job_context),
processing_task,
ReportsTask(job_context=job_context, database_sink=db_sink, write_to_catalog=True),
],
model_configs={
"RawReservationModel": {"database": "etl_gp_raw_chevalcollection"},
"RawRateModel": {"database": "etl_gp_raw_chevalcollection"},
"RawProfileModel": {"database": "etl_gp_raw_chevalcollection"},
},
)
orchestrator.run()Execution Flow
The orchestrator validates the dependency graph and executes the tasks in order. The Cheval Collection pipeline modifies the default processing order to include OperaLegacyMergeTask and CMTPFixTask.