Skip to Content
DevelopmentReferenceTasksChainsOperaOperaIngesterTask — Developer Reference

OperaIngesterTask — Developer Reference

Developer reference for OperaIngesterTask (HTTP-based Opera ingestion). This page documents constructor parameters, HTTP behaviour and the major methods used to fetch reservations, rates, and guest profiles.

Constructor Parameters

ParameterTypeDescription
nameOptional[str]Optional task name
job_contextJobContextJob context with Spark, catalog and config
sync_datesOptional[List[str]]Specific dates to sync (YYYY-MM-DD)

Provides / Requires

  • requires() — []
  • provides() — []

Example Usage

from etl_lib.tasks.chains.opera.OperaIngesterTask import OperaIngesterTask ingester = OperaIngesterTask(job_context=job_context, sync_dates=["2025-01-01"]) ingester.run()

Implementation Details

The OperaIngesterTask inherits from BaseIngesterTask and utilizes HttpClient from etl_lib.pipeline.ingest.utils.HttpClient to perform authenticated calls to the Opera OHIP API:

Key methods:

  • access_token() — obtains and caches an OAuth token using credentials defined in the source configuration.
  • _get_raw_reservations(property_id, sync_date) — fetches arriving, departing, created and updated reservations for the specified property/date; de-duplicates and writes to ingest.
  • _get_raw_rates(res_list, property_id, sync_date) — fetches detailed rates via a dedicated rateInfo endpoint and persists daily rate structures.
  • _get_raw_profiles(res_list, property_id, sync_date) — fetches guest profile data for the list of related reservations and persists as guest_profiles.

Concurrency

_get_raw_rates and _get_raw_profiles use ThreadPoolExecutor to parallelize external HTTP requests per reservation id.

Example: Fetching rates

for res in res_list: response = client.fetch(endpoint=f"/rsv/v1/hotels/{property_id}/reservations/rateInfo", extra_params={"id": res_id}) self.rates_raw[res_id] = response[0] if response else []

Back to process documentation: [/processes/tasks/chains/opera/opera-raw-task](/processes/tasks/chains/opera/opera-raw-task)

Last updated on