Skip to Content

Cheval Collection Workflow

The Cheval Collection workflow processes hotel data for Cheval Collection properties using the Opera PMS integration. This workflow demonstrates how specific Opera chain tasks (legacy merge and CMTP fixes) are inserted into the common processing pipeline.

Workflow Overview

This workflow follows the same general ETL pattern as other chains:

  1. Raw Data Ingestion & Crawling - Extract data from Opera source
  2. Data Cleaning - Transform and standardize raw Opera data
  3. Processing - Apply chain-specific logic (legacy data merge, CMTP fix, guest matching)
  4. Report Generation - Produce final analytics tables

Pipeline Architecture

┌─────────────────────┐ │ OperaRawTask │ ← Ingestion + Crawling └──────────┬──────────┘ │ RawReservationModel │ RawRateModel │ RawProfileModel ┌─────────────────────┐ │ CleanOperaTask │ ← Data Cleaning └──────────┬──────────┘ │ CleanGuestModel │ CleanReservationModel │ CleanRoomModel ┌─────────────────────┐ │ ProcessingTask │ ← Business Logic │ (with LegacyMerge, │ │ and CMTPFix) │ └──────────┬──────────┘ │ ProcessedGuestModel │ ProcessedReservationModel │ ProcessedRoomModel ┌─────────────────────┐ │ ReportsTask │ ← Analytics Reports └─────────────────────┘

Stage 1: Raw Data Ingestion & Crawling

OperaRawTask

The OperaRawTask orchestrates the Opera ingester and crawler subtasks:

  • OperaIngesterTask (optional) - Downloads raw data from Opera API
  • OperaCrawlerTask - Parses downloaded files and creates raw data models

Models Produced

  • RawReservationModel - Raw reservations
  • RawRateModel - Raw rate/daily revenue details
  • RawProfileModel - Raw guest profiles

Stage 2: Data Cleaning

CleanOperaTask

The CleanOperaTask transforms raw Opera data to standard cleaned models used across the ETL pipeline. This includes:

  • Standardizing reservation statuses and IDs
  • Generating stable UUIDs for reservations and guests
  • Parsing check-in/out dates and per-stay daily rows
  • Matching rates to room stay dates
  • Creating normalized guest and room records

Models Produced

  • CleanGuestModel
  • CleanReservationModel
  • CleanRoomModel

Stage 3: Processing

ProcessingTask with OperaLegacyMerge & CMTPFix

The ProcessingTask runs common processing subtasks. For Cheval Collection we insert two Opera-specific subtasks into the processing pipeline:

  1. OperaLegacyMergeTask — Merges historical (legacy) data into current processed models and ensures consistent schema alignment.
  2. CMTPFixTask — Applies a targeted workaround for specific CTMP cancelled booking issues at Cheval Maison The Palm Dubai.

In jobs/chevalcollection_task.py the job adds these subtasks and disables GuestMatchingTask:

processing_task = ProcessingTask(job_context=job_context) processing_task.insert_subtask( subtask=OperaLegacyMergeTask(job_context=job_context, write_to_catalog=False), before=GuestMatchingTask, ) processing_task.insert_subtask( subtask=CMTPFixTask(job_context=job_context, write_to_catalog=False), after=OperaLegacyMergeTask, ) processing_task.skip_task(GuestMatchingTask)

Why these tasks?

  • OperaLegacyMergeTask aligns historical Opera data with the current processed models, handling partial overlaps and adjusting booking dates accounting for a cutoff_date per property.

  • CMTPFixTask applies property/reservation-specific marketplace fixes (status updates, date truncations, currency conversions) to correct known issues for production reports.

Stage 4: Report Generation

ReportsTask

The ReportsTask writes final aggregates and reporting tables to BigQuery. Reports include daily rows, aggregate booking metrics, guest loyalty tables, and pickup/pace analysis.

Model Configuration

Example model_configs from the job:

model_configs={ "RawReservationModel": {"database": "etl_gp_raw_chevalcollection"}, "RawRateModel": {"database": "etl_gp_raw_chevalcollection"}, "RawProfileModel": {"database": "etl_gp_raw_chevalcollection"}, }

Orchestrator Example

Here’s an example jobs/chevalcollection_task.py showing how to assemble the pipeline (raw → clean → processing → reports) and insert Opera-specific subtasks:

from etl_lib.Orchestrator import Orchestrator from etl_lib.data.BigQuery import BigQuery from etl_lib.job.JobContext import JobContext from etl_lib.tasks.chains.opera import ( OperaRawTask, CleanOperaTask, OperaLegacyMergeTask, CMTPFixTask, ) from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask from etl_lib.tasks.processing.ProcessingTask import ProcessingTask from etl_lib.tasks.reports.ReportsTask import ReportsTask job_context = JobContext(chain_id="chevalcollection", partitions=16) db_sink = BigQuery(job_context) processing_task = ProcessingTask(job_context=job_context) processing_task.insert_subtask( subtask=OperaLegacyMergeTask(job_context=job_context, write_to_catalog=False), before=GuestMatchingTask, ) processing_task.insert_subtask( subtask=CMTPFixTask(job_context=job_context, write_to_catalog=False), after=OperaLegacyMergeTask, ) processing_task.skip_task(GuestMatchingTask) orchestrator = Orchestrator( job_context=job_context, tasks=[ OperaRawTask(job_context=job_context, skip_ingestion=False), CleanOperaTask(job_context=job_context), processing_task, ReportsTask(job_context=job_context, database_sink=db_sink, write_to_catalog=True), ], model_configs={ "RawReservationModel": {"database": "etl_gp_raw_chevalcollection"}, "RawRateModel": {"database": "etl_gp_raw_chevalcollection"}, "RawProfileModel": {"database": "etl_gp_raw_chevalcollection"}, }, ) orchestrator.run()

Execution Flow

The orchestrator validates the dependency graph and executes the tasks in order. The Cheval Collection pipeline modifies the default processing order to include OperaLegacyMergeTask and CMTPFixTask.

Last updated on