Opera (OHIP)
The Opera ingest process retrieves reservation, rate, and guest profile data from Oracle Hospitality Integration Platform (OHIP) via REST API and loads it into the raw data catalog.
Overview
The OperaIngester class handles the extraction of three main data types from the Opera Cloud Property Management System:
- Reservations: Comprehensive booking information including guest details, room stays, status, and revenue breakdowns
- Daily Rates: Detailed rate information for each day of a reservation stay
- Guest Profiles: Complete guest profile data including contact information, addresses, preferences, and privacy settings
Data Source Configuration
The ingester connects to Oracle’s OHIP API gateway using OAuth 2.0 authentication configured in the job context:
OperaIngester(job_context).run()Connection Details
- Protocol: HTTPS REST API
- Authentication: OAuth 2.0 with client credentials grant
- Data Format: JSON with nested structures
- Base URL: Configured via
source["gateway_url"] - Required Headers:
x-app-key: Application key for API accessx-hotelid: Property identifier for hotel-specific requestsAuthorization: Bearer token (auto-refreshed)
Authentication Flow
The ingester implements automatic token management:
- Initial Request: Obtains access token using client credentials
- Token Caching: Stores token and expiry timestamp in memory
- Auto-Refresh: Checks expiry before each API call and refreshes if needed
- Expiry Buffer: Token is considered expired when
datetime.now() > token_expiry
Ingest Process Flow
Reservation Retrieval Strategy
The ingester uses a multi-dimensional fetch approach to capture all relevant reservations:
1. Arrival-Based Fetch
Retrieves all reservations with arrival date matching the sync date.
GET /rsv/v1/hotels/{hotelId}/reservations?arrivalStartDate={date}&arrivalEndDate={date}2. Departure-Based Fetch
Retrieves all reservations with departure date matching the sync date.
GET /rsv/v1/hotels/{hotelId}/reservations?departureStartDate={date}&departureEndDate={date}3. Creation-Based Fetch
Retrieves all reservations created on the sync date.
GET /rsv/v1/hotels/{hotelId}/reservations?createdOnStartDate={date}&createdOnEndDate={date}4. Recently Modified Fetch
Retrieves reservations accessed recently and filters for modifications within the current week.
GET /rsv/v1/hotels/{hotelId}/reservations?recentlyAccessed=truePost-Processing Filter: Applies is_modified_this_week() to retain only reservations modified since the start of the current week (Monday).
Deduplication
After fetching from all four endpoints, reservations are unioned and deduplicated by reservationIdList[0].id to ensure each reservation appears only once per sync date.
Incremental Loading Strategy
The ingester uses an incremental sync approach:
- Check Last Sync Date: Query the raw catalog for the most recent
synced_datefor the reservations table per property - Calculate Missing Dates: Determine all dates between last sync and today
- Fetch Daily Data: Process one date at a time per property
- Record Sync Date: Each batch is tagged with its
synced_datefor future incremental runs
If no previous sync exists (first run) and sync_dates are provided to the constructor, those specific dates will be processed.
Parallel Data Enrichment
For each batch of reservations, the ingester concurrently fetches supplementary data:
Daily Rates Fetching
- Endpoint:
/rsv/v1/hotels/{hotelId}/reservations/rateInfo - Concurrency: 10 worker threads via
ThreadPoolExecutor - Method:
_get_raw_rates() - Purpose: Retrieves day-by-day rate breakdown for each reservation
- Error Handling: Catches exceptions per request; failed requests result in empty rate data
Guest Profiles Fetching
- Endpoint:
/crm/v1/guests/{guestId} - Concurrency: 10 worker threads via
ThreadPoolExecutor - Method:
_get_raw_profiles() - Fetch Instructions:
Address,Membership,Profile,Preference,Correspondence - Purpose: Retrieves complete guest demographic and preference data
- Error Handling: Catches exceptions per request; failed requests result in null profile data
Data Schemas
Reservations Schema
The reservation data captures comprehensive booking information with nested structures:
| Field Category | Key Fields | Description |
|---|---|---|
| Reservation IDs | reservationIdList[].id, reservationIdList[].type | System identifiers for the reservation |
| Room Stay | roomStay.arrivalDate, roomStay.departureDate, roomStay.adultCount, roomStay.childCount | Stay dates and guest counts |
| Room Details | roomStay.roomClass, roomStay.roomType, roomStay.roomId, roomStay.numberOfRooms | Room type and assignment |
| Rate Information | roomStay.ratePlanCode, roomStay.rateAmount, roomStay.totalAmount, roomStay.fixedRate | Rate plan and amounts |
| Market & Source | roomStay.marketCode, roomStay.sourceCode, roomStay.sourceCodeDescription, roomStay.bookingChannelCode | Booking channel and market segment |
| Guest Identity | reservationGuest.givenName, reservationGuest.surname, reservationGuest.email, reservationGuest.phoneNumber, reservationGuest.birthDate | Primary guest information |
| Guest Location | reservationGuest.address.country, reservationGuest.address.cityName, reservationGuest.address.postalCode, reservationGuest.address.state | Guest address |
| Guest Demographics | reservationGuest.nationality, reservationGuest.language, reservationGuest.vip.vipCode | Nationality and VIP status |
| Attached Profiles | attachedProfiles[].name, attachedProfiles[].profileIdList, attachedProfiles[].reservationProfileType | Additional profiles (company, travel agent) |
| Payment Method | reservationPaymentMethod.paymentMethod, paymentMethod | Payment type |
| Revenue Breakdown | revenuesAndBalances.roomRevenue, revenuesAndBalances.foodAndBevRevenue, revenuesAndBalances.otherRevenue, revenuesAndBalances.totalRevenue | Revenue by category |
| Financial | revenuesAndBalances.totalPayment, revenuesAndBalances.balance, revenuesAndBalances.totalFixedCharge | Payments and balances |
| Status | reservationStatus, computedReservationStatus, roomStatus | Current reservation and room status |
| Display | displayColor, displayColorDetails.colorDefinition, displayColorDetails.colorDescription | UI display settings |
| Source of Sale | sourceOfSale.sourceType, sourceOfSale.sourceCode | Original booking source |
| Property | hotelId, hotelName | Property identification |
| Audit Trail | createDateTime, createBusinessDate, lastModifyDateTime | Creation and modification timestamps |
| Flags | walkInIndicator, preRegistered, openFolio, allowMobileCheckout, optedForCommunication | Boolean indicators |
| Commission | commissionPayoutTo | Commission recipient |
Schema Type: Complex nested StructType with arrays and nested objects (see RESERVATION_SUMMARY_SCHEMA)
Daily Rates Schema
The daily rate data provides granular rate information for each day of the stay:
| Field | Type | Description |
|---|---|---|
reservationId | String | Link to parent reservation |
dailyRates | Struct | Container for rate details |
dailyRates.details[] | Array | Daily rate breakdown |
dailyRates.details[].summaryDate | String | Date for this rate entry |
dailyRates.details[].revenue | String | Room revenue amount |
dailyRates.details[].package | String | Package charges |
dailyRates.details[].tax | String | Tax amount |
dailyRates.details[].gross | String | Gross amount before tax |
dailyRates.details[].net | String | Net amount after discounts |
dailyRates.details[].ratePlanCode | String | Rate plan identifier |
dailyRates.details[].currencyCode | String | Currency code |
dailyRates.details[].rateSuppressed | Boolean | Whether rate is hidden |
dailyRates.gross | String | Total gross for entire stay |
dailyRates.net | String | Total net for entire stay |
dailyRates.fixedCharges | String | Total fixed charges |
dailyRates.deposit | String | Deposit amount |
dailyRates.totalCostOfStay | String | Total cost including all charges |
dailyRates.outStandingCostOfStay | String | Outstanding balance |
dailyRates.currencyCode | String | Currency for all amounts |
dailyRates.start | String | Stay start date |
dailyRates.end | String | Stay end date |
dailyRates.hasSuppressedRate | Boolean | Indicates if any rate is suppressed |
Schema Type: StructType with nested array of daily rate details (see RATE_INFO_SCHEMA)
Guest Profiles Schema
The guest profile data contains comprehensive guest information:
| Field Category | Key Fields | Description |
|---|---|---|
| Profile IDs | guestIdList[].id, guestIdList[].type | Guest profile identifiers |
| Name Details | guestDetails.customer.personName[].givenName, surname, nameTitle, nameSuffix | Guest name variants |
| Greetings | guestDetails.customer.personName[].salutation, envelopeGreeting | Personalized greetings |
| Demographics | guestDetails.customer.nationality, nationalityDescription, citizenCountry, language | Nationality and language |
| VIP Status | guestDetails.customer.vipStatus, vipDescription | VIP classification |
| Addresses | guestDetails.addresses.addressInfo[].address.addressLine[], cityName, postalCode, country | Physical addresses |
| Address Metadata | guestDetails.addresses.addressInfo[].address.type, typeDescription, primaryInd, isValidated | Address classification |
| Memberships | guestDetails.profileMemberships | Loyalty program memberships (Map) |
| Preferences | guestDetails.preferenceCollection.totalResults | Count of guest preferences |
| Privacy Settings | guestDetails.privacyInfo.marketResearchParticipation, allowPhone, allowSMS, allowEmail | Communication preferences |
| Opt-in Flags | guestDetails.privacyInfo.optInMailingList, optInMarketResearch, optInThirdParty | Marketing opt-ins |
| Profile Access | guestDetails.profileAccessType.hotelId, sharedLevel | Profile sharing configuration |
| Restrictions | guestDetails.profileRestrictions.restricted | Whether profile is restricted |
| Mailing | guestDetails.mailingActions.active, totalResults | Mailing action settings |
| Tax Information | guestDetails.taxInfo | Tax-related data (Map) |
| Profile Metadata | guestDetails.profileType, statusCode, registeredProperty | Profile classification |
| Audit Trail | guestDetails.createDateTime, creatorId, lastModifyDateTime, lastModifierId | Creation and modification tracking |
| Privacy Flag | guestDetails.customer.privateProfile | Whether profile is marked private |
| History | guestDetails.markForHistory | Historical profile marker |
| API Links | links[].href, rel, method, operationId | HATEOAS links for related resources |
Schema Type: Complex nested StructType with arrays, maps, and deeply nested objects (see GUEST_PROFILE_SCHEMA)
Implementation Details
Core Methods
run()
Main execution method that orchestrates the ingest process:
- Iterates through each property configured in the job context
- Determines sync dates for the reservations table using
get_next_sync_dates() - Falls back to
sync_datesconstructor parameter if no dates found - Calls
_get_raw_reservations()for each property and date combination
access_token()
OAuth 2.0 token management with automatic refresh:
- Checks token expiry using
is_token_expired() - Posts to
/oauth/v1/tokensendpoint with client credentials - Caches
_access_tokenand_token_expiryin instance variables - Returns cached token if still valid
is_token_expired()
Token expiry validation:
- Returns
Trueif_token_expiryisNone(no token obtained yet) - Returns
Trueif current datetime exceeds stored expiry - Returns
Falseif token is still valid
is_modified_this_week(res)
Static method to filter recently modified reservations:
- Calculates start of current week (Monday) using
datetime.weekday() - Parses
lastModifyDateTimefrom reservation - Returns
Trueif modification date is on or after Monday of current week - Handles missing timestamps and parsing errors gracefully
_get_raw_reservations(property_id: str, sync_date: str)
Comprehensive reservation ingestion for a single property and date:
- Authenticate: Obtains bearer token via
access_token() - Fetch from 4 endpoints: Arrivals, departures, created, and updated reservations
- Create DataFrames: Converts each list to Spark DataFrame with
RESERVATION_SUMMARY_SCHEMA - Union: Combines all four DataFrames using
unionByName - Deduplicate: Removes duplicates based on
reservationIdList[0].id - Enrich: Calls
_get_raw_rates()and_get_raw_profiles()in sequence - Write: Persists to
etl_gp_raw_opera.reservationswith property partition
_get_raw_rates(res_list, property_id: str, sync_date: str)
Parallel fetching of daily rate information:
- Initialize: Clears
self.rates_rawdictionary - Setup Client: Creates HttpClient with authentication headers
- Define Worker:
fetch_rate()function retrieves rates for one reservation - Parallel Execution: ThreadPoolExecutor with 10 workers processes all reservations
- Error Handling: Exceptions are caught per reservation; empty list returned on failure
- Transform: Converts dictionary to DataFrame with
reservationIdanddailyRatescolumns - Write: Persists to
etl_gp_raw_opera.daily_rateswith property partition
_get_raw_profiles(res_list, property_id: str, sync_date: str)
Parallel fetching of guest profile information:
- Setup Client: Creates HttpClient with authentication headers
- Define Worker:
fetch_profile()function retrieves profile for one guest - Parallel Execution: ThreadPoolExecutor with 10 workers processes all guests
- Fetch Instructions: Includes Address, Membership, Profile, Preference, Correspondence
- Error Handling: Exceptions are caught per profile; null returned on failure
- Filter Nulls: Removes null profiles from the list
- Empty Handling: Creates empty DataFrame with schema if no valid profiles
- Write: Persists to
etl_gp_raw_opera.guest_profiles(no property partition for profiles)
Error Handling
- Token Refresh Failures: If OAuth token request fails, exception propagates and stops ingestion
- API Request Failures: HttpClient handles pagination errors and connection issues
- Parallel Fetch Errors: Individual profile/rate fetch failures are logged and skipped; processing continues
- Empty Response Handling: Empty reservation lists result in empty DataFrames (no failure)
- Duplicate Reservations: Handled via
dropDuplicates()without raising errors
Type Safety
All nested JSON structures are validated against PySpark schemas (RESERVATION_SUMMARY_SCHEMA, RATE_INFO_SCHEMA, GUEST_PROFILE_SCHEMA) during DataFrame creation. This ensures:
- Type consistency for downstream processing
- Early detection of schema changes in the API
- Null-safe handling of optional fields
- Preserved nested structure for complex objects
Fields are primarily StringType to preserve raw API values. Type casting and business logic occur in the cleaning stage of the workflow.
Catalog Output
After successful ingestion, data is available in the raw catalog:
Database: etl_gp_raw_opera
Tables:
reservations- Complete reservation details with partitioning bysynced_dateandproperty_iddaily_rates- Day-by-day rate breakdown linked to reservations, partitioned bysynced_dateandproperty_idguest_profiles- Comprehensive guest profile data, partitioned bysynced_dateonly (not by property)
API Rate Limiting Considerations
The Opera Cloud API (OHIP) enforces rate limits:
- Token Endpoint: Limited requests per hour for OAuth token generation
- Reservation Endpoints: Pagination with
limit=200to avoid large response payloads - Profile/Rate Endpoints: ThreadPool concurrency set to 10 to balance performance and API limits
If rate limits are encountered, the HttpClient will raise exceptions. Consider:
- Reducing
ThreadPoolExecutormax_workers from 10 to 5 - Implementing exponential backoff in error handling
- Distributing sync dates across multiple job runs
Usage in Workflow
The Opera ingester is typically invoked as the first stage in the ETL workflow:
from etl_lib.job.JobContext import JobContext
from etl_lib.pipeline.ingest.opera.OperaIngester import OperaIngester
# Standard incremental ingest
job_context = JobContext(source="opera", property_ids=["PROP123"])
OperaIngester(job_context).run()
# Backfill specific dates
job_context = JobContext(source="opera", property_ids=["PROP123"])
OperaIngester(job_context, sync_dates=["2025-01-01", "2025-01-02"]).run()The raw data is then processed by the cleaning pipeline to transform nested structures into flat tables suitable for analytics.