Skip to Content

Opera (OHIP)

The Opera ingest process retrieves reservation, rate, and guest profile data from Oracle Hospitality Integration Platform (OHIP) via REST API and loads it into the raw data catalog.

Overview

The OperaIngester class handles the extraction of three main data types from the Opera Cloud Property Management System:

  • Reservations: Comprehensive booking information including guest details, room stays, status, and revenue breakdowns
  • Daily Rates: Detailed rate information for each day of a reservation stay
  • Guest Profiles: Complete guest profile data including contact information, addresses, preferences, and privacy settings

Data Source Configuration

The ingester connects to Oracle’s OHIP API gateway using OAuth 2.0 authentication configured in the job context:

OperaIngester(job_context).run()

Connection Details

  • Protocol: HTTPS REST API
  • Authentication: OAuth 2.0 with client credentials grant
  • Data Format: JSON with nested structures
  • Base URL: Configured via source["gateway_url"]
  • Required Headers:
    • x-app-key: Application key for API access
    • x-hotelid: Property identifier for hotel-specific requests
    • Authorization: Bearer token (auto-refreshed)

Authentication Flow

The ingester implements automatic token management:

  1. Initial Request: Obtains access token using client credentials
  2. Token Caching: Stores token and expiry timestamp in memory
  3. Auto-Refresh: Checks expiry before each API call and refreshes if needed
  4. Expiry Buffer: Token is considered expired when datetime.now() > token_expiry

Ingest Process Flow

Reservation Retrieval Strategy

The ingester uses a multi-dimensional fetch approach to capture all relevant reservations:

1. Arrival-Based Fetch

Retrieves all reservations with arrival date matching the sync date.

GET /rsv/v1/hotels/{hotelId}/reservations?arrivalStartDate={date}&arrivalEndDate={date}

2. Departure-Based Fetch

Retrieves all reservations with departure date matching the sync date.

GET /rsv/v1/hotels/{hotelId}/reservations?departureStartDate={date}&departureEndDate={date}

3. Creation-Based Fetch

Retrieves all reservations created on the sync date.

GET /rsv/v1/hotels/{hotelId}/reservations?createdOnStartDate={date}&createdOnEndDate={date}

4. Recently Modified Fetch

Retrieves reservations accessed recently and filters for modifications within the current week.

GET /rsv/v1/hotels/{hotelId}/reservations?recentlyAccessed=true

Post-Processing Filter: Applies is_modified_this_week() to retain only reservations modified since the start of the current week (Monday).

Deduplication

After fetching from all four endpoints, reservations are unioned and deduplicated by reservationIdList[0].id to ensure each reservation appears only once per sync date.

Incremental Loading Strategy

The ingester uses an incremental sync approach:

  1. Check Last Sync Date: Query the raw catalog for the most recent synced_date for the reservations table per property
  2. Calculate Missing Dates: Determine all dates between last sync and today
  3. Fetch Daily Data: Process one date at a time per property
  4. Record Sync Date: Each batch is tagged with its synced_date for future incremental runs

If no previous sync exists (first run) and sync_dates are provided to the constructor, those specific dates will be processed.

Parallel Data Enrichment

For each batch of reservations, the ingester concurrently fetches supplementary data:

Daily Rates Fetching

  • Endpoint: /rsv/v1/hotels/{hotelId}/reservations/rateInfo
  • Concurrency: 10 worker threads via ThreadPoolExecutor
  • Method: _get_raw_rates()
  • Purpose: Retrieves day-by-day rate breakdown for each reservation
  • Error Handling: Catches exceptions per request; failed requests result in empty rate data

Guest Profiles Fetching

  • Endpoint: /crm/v1/guests/{guestId}
  • Concurrency: 10 worker threads via ThreadPoolExecutor
  • Method: _get_raw_profiles()
  • Fetch Instructions: Address, Membership, Profile, Preference, Correspondence
  • Purpose: Retrieves complete guest demographic and preference data
  • Error Handling: Catches exceptions per request; failed requests result in null profile data

Data Schemas

Reservations Schema

The reservation data captures comprehensive booking information with nested structures:

Field CategoryKey FieldsDescription
Reservation IDsreservationIdList[].id, reservationIdList[].typeSystem identifiers for the reservation
Room StayroomStay.arrivalDate, roomStay.departureDate, roomStay.adultCount, roomStay.childCountStay dates and guest counts
Room DetailsroomStay.roomClass, roomStay.roomType, roomStay.roomId, roomStay.numberOfRoomsRoom type and assignment
Rate InformationroomStay.ratePlanCode, roomStay.rateAmount, roomStay.totalAmount, roomStay.fixedRateRate plan and amounts
Market & SourceroomStay.marketCode, roomStay.sourceCode, roomStay.sourceCodeDescription, roomStay.bookingChannelCodeBooking channel and market segment
Guest IdentityreservationGuest.givenName, reservationGuest.surname, reservationGuest.email, reservationGuest.phoneNumber, reservationGuest.birthDatePrimary guest information
Guest LocationreservationGuest.address.country, reservationGuest.address.cityName, reservationGuest.address.postalCode, reservationGuest.address.stateGuest address
Guest DemographicsreservationGuest.nationality, reservationGuest.language, reservationGuest.vip.vipCodeNationality and VIP status
Attached ProfilesattachedProfiles[].name, attachedProfiles[].profileIdList, attachedProfiles[].reservationProfileTypeAdditional profiles (company, travel agent)
Payment MethodreservationPaymentMethod.paymentMethod, paymentMethodPayment type
Revenue BreakdownrevenuesAndBalances.roomRevenue, revenuesAndBalances.foodAndBevRevenue, revenuesAndBalances.otherRevenue, revenuesAndBalances.totalRevenueRevenue by category
FinancialrevenuesAndBalances.totalPayment, revenuesAndBalances.balance, revenuesAndBalances.totalFixedChargePayments and balances
StatusreservationStatus, computedReservationStatus, roomStatusCurrent reservation and room status
DisplaydisplayColor, displayColorDetails.colorDefinition, displayColorDetails.colorDescriptionUI display settings
Source of SalesourceOfSale.sourceType, sourceOfSale.sourceCodeOriginal booking source
PropertyhotelId, hotelNameProperty identification
Audit TrailcreateDateTime, createBusinessDate, lastModifyDateTimeCreation and modification timestamps
FlagswalkInIndicator, preRegistered, openFolio, allowMobileCheckout, optedForCommunicationBoolean indicators
CommissioncommissionPayoutToCommission recipient

Schema Type: Complex nested StructType with arrays and nested objects (see RESERVATION_SUMMARY_SCHEMA)

Daily Rates Schema

The daily rate data provides granular rate information for each day of the stay:

FieldTypeDescription
reservationIdStringLink to parent reservation
dailyRatesStructContainer for rate details
dailyRates.details[]ArrayDaily rate breakdown
dailyRates.details[].summaryDateStringDate for this rate entry
dailyRates.details[].revenueStringRoom revenue amount
dailyRates.details[].packageStringPackage charges
dailyRates.details[].taxStringTax amount
dailyRates.details[].grossStringGross amount before tax
dailyRates.details[].netStringNet amount after discounts
dailyRates.details[].ratePlanCodeStringRate plan identifier
dailyRates.details[].currencyCodeStringCurrency code
dailyRates.details[].rateSuppressedBooleanWhether rate is hidden
dailyRates.grossStringTotal gross for entire stay
dailyRates.netStringTotal net for entire stay
dailyRates.fixedChargesStringTotal fixed charges
dailyRates.depositStringDeposit amount
dailyRates.totalCostOfStayStringTotal cost including all charges
dailyRates.outStandingCostOfStayStringOutstanding balance
dailyRates.currencyCodeStringCurrency for all amounts
dailyRates.startStringStay start date
dailyRates.endStringStay end date
dailyRates.hasSuppressedRateBooleanIndicates if any rate is suppressed

Schema Type: StructType with nested array of daily rate details (see RATE_INFO_SCHEMA)

Guest Profiles Schema

The guest profile data contains comprehensive guest information:

Field CategoryKey FieldsDescription
Profile IDsguestIdList[].id, guestIdList[].typeGuest profile identifiers
Name DetailsguestDetails.customer.personName[].givenName, surname, nameTitle, nameSuffixGuest name variants
GreetingsguestDetails.customer.personName[].salutation, envelopeGreetingPersonalized greetings
DemographicsguestDetails.customer.nationality, nationalityDescription, citizenCountry, languageNationality and language
VIP StatusguestDetails.customer.vipStatus, vipDescriptionVIP classification
AddressesguestDetails.addresses.addressInfo[].address.addressLine[], cityName, postalCode, countryPhysical addresses
Address MetadataguestDetails.addresses.addressInfo[].address.type, typeDescription, primaryInd, isValidatedAddress classification
MembershipsguestDetails.profileMembershipsLoyalty program memberships (Map)
PreferencesguestDetails.preferenceCollection.totalResultsCount of guest preferences
Privacy SettingsguestDetails.privacyInfo.marketResearchParticipation, allowPhone, allowSMS, allowEmailCommunication preferences
Opt-in FlagsguestDetails.privacyInfo.optInMailingList, optInMarketResearch, optInThirdPartyMarketing opt-ins
Profile AccessguestDetails.profileAccessType.hotelId, sharedLevelProfile sharing configuration
RestrictionsguestDetails.profileRestrictions.restrictedWhether profile is restricted
MailingguestDetails.mailingActions.active, totalResultsMailing action settings
Tax InformationguestDetails.taxInfoTax-related data (Map)
Profile MetadataguestDetails.profileType, statusCode, registeredPropertyProfile classification
Audit TrailguestDetails.createDateTime, creatorId, lastModifyDateTime, lastModifierIdCreation and modification tracking
Privacy FlagguestDetails.customer.privateProfileWhether profile is marked private
HistoryguestDetails.markForHistoryHistorical profile marker
API Linkslinks[].href, rel, method, operationIdHATEOAS links for related resources

Schema Type: Complex nested StructType with arrays, maps, and deeply nested objects (see GUEST_PROFILE_SCHEMA)

Implementation Details

Core Methods

run()

Main execution method that orchestrates the ingest process:

  1. Iterates through each property configured in the job context
  2. Determines sync dates for the reservations table using get_next_sync_dates()
  3. Falls back to sync_dates constructor parameter if no dates found
  4. Calls _get_raw_reservations() for each property and date combination

access_token()

OAuth 2.0 token management with automatic refresh:

  • Checks token expiry using is_token_expired()
  • Posts to /oauth/v1/tokens endpoint with client credentials
  • Caches _access_token and _token_expiry in instance variables
  • Returns cached token if still valid

is_token_expired()

Token expiry validation:

  • Returns True if _token_expiry is None (no token obtained yet)
  • Returns True if current datetime exceeds stored expiry
  • Returns False if token is still valid

is_modified_this_week(res)

Static method to filter recently modified reservations:

  • Calculates start of current week (Monday) using datetime.weekday()
  • Parses lastModifyDateTime from reservation
  • Returns True if modification date is on or after Monday of current week
  • Handles missing timestamps and parsing errors gracefully

_get_raw_reservations(property_id: str, sync_date: str)

Comprehensive reservation ingestion for a single property and date:

  1. Authenticate: Obtains bearer token via access_token()
  2. Fetch from 4 endpoints: Arrivals, departures, created, and updated reservations
  3. Create DataFrames: Converts each list to Spark DataFrame with RESERVATION_SUMMARY_SCHEMA
  4. Union: Combines all four DataFrames using unionByName
  5. Deduplicate: Removes duplicates based on reservationIdList[0].id
  6. Enrich: Calls _get_raw_rates() and _get_raw_profiles() in sequence
  7. Write: Persists to etl_gp_raw_opera.reservations with property partition

_get_raw_rates(res_list, property_id: str, sync_date: str)

Parallel fetching of daily rate information:

  1. Initialize: Clears self.rates_raw dictionary
  2. Setup Client: Creates HttpClient with authentication headers
  3. Define Worker: fetch_rate() function retrieves rates for one reservation
  4. Parallel Execution: ThreadPoolExecutor with 10 workers processes all reservations
  5. Error Handling: Exceptions are caught per reservation; empty list returned on failure
  6. Transform: Converts dictionary to DataFrame with reservationId and dailyRates columns
  7. Write: Persists to etl_gp_raw_opera.daily_rates with property partition

_get_raw_profiles(res_list, property_id: str, sync_date: str)

Parallel fetching of guest profile information:

  1. Setup Client: Creates HttpClient with authentication headers
  2. Define Worker: fetch_profile() function retrieves profile for one guest
  3. Parallel Execution: ThreadPoolExecutor with 10 workers processes all guests
  4. Fetch Instructions: Includes Address, Membership, Profile, Preference, Correspondence
  5. Error Handling: Exceptions are caught per profile; null returned on failure
  6. Filter Nulls: Removes null profiles from the list
  7. Empty Handling: Creates empty DataFrame with schema if no valid profiles
  8. Write: Persists to etl_gp_raw_opera.guest_profiles (no property partition for profiles)

Error Handling

  • Token Refresh Failures: If OAuth token request fails, exception propagates and stops ingestion
  • API Request Failures: HttpClient handles pagination errors and connection issues
  • Parallel Fetch Errors: Individual profile/rate fetch failures are logged and skipped; processing continues
  • Empty Response Handling: Empty reservation lists result in empty DataFrames (no failure)
  • Duplicate Reservations: Handled via dropDuplicates() without raising errors

Type Safety

All nested JSON structures are validated against PySpark schemas (RESERVATION_SUMMARY_SCHEMA, RATE_INFO_SCHEMA, GUEST_PROFILE_SCHEMA) during DataFrame creation. This ensures:

  • Type consistency for downstream processing
  • Early detection of schema changes in the API
  • Null-safe handling of optional fields
  • Preserved nested structure for complex objects

Fields are primarily StringType to preserve raw API values. Type casting and business logic occur in the cleaning stage of the workflow.

Catalog Output

After successful ingestion, data is available in the raw catalog:

Database: etl_gp_raw_opera

Tables:

  • reservations - Complete reservation details with partitioning by synced_date and property_id
  • daily_rates - Day-by-day rate breakdown linked to reservations, partitioned by synced_date and property_id
  • guest_profiles - Comprehensive guest profile data, partitioned by synced_date only (not by property)

API Rate Limiting Considerations

The Opera Cloud API (OHIP) enforces rate limits:

  • Token Endpoint: Limited requests per hour for OAuth token generation
  • Reservation Endpoints: Pagination with limit=200 to avoid large response payloads
  • Profile/Rate Endpoints: ThreadPool concurrency set to 10 to balance performance and API limits

If rate limits are encountered, the HttpClient will raise exceptions. Consider:

  • Reducing ThreadPoolExecutor max_workers from 10 to 5
  • Implementing exponential backoff in error handling
  • Distributing sync dates across multiple job runs

Usage in Workflow

The Opera ingester is typically invoked as the first stage in the ETL workflow:

from etl_lib.job.JobContext import JobContext from etl_lib.pipeline.ingest.opera.OperaIngester import OperaIngester # Standard incremental ingest job_context = JobContext(source="opera", property_ids=["PROP123"]) OperaIngester(job_context).run() # Backfill specific dates job_context = JobContext(source="opera", property_ids=["PROP123"]) OperaIngester(job_context, sync_dates=["2025-01-01", "2025-01-02"]).run()

The raw data is then processed by the cleaning pipeline to transform nested structures into flat tables suitable for analytics.

Last updated on