OperaIngesterTask — Developer Reference
Developer reference for OperaIngesterTask (HTTP-based Opera ingestion). This page documents constructor parameters, HTTP behaviour and the major methods used to fetch reservations, rates, and guest profiles.
Constructor Parameters
| Parameter | Type | Description |
|---|---|---|
name | Optional[str] | Optional task name |
job_context | JobContext | Job context with Spark, catalog and config |
sync_dates | Optional[List[str]] | Specific dates to sync (YYYY-MM-DD) |
Provides / Requires
requires()— []provides()— []
Example Usage
from etl_lib.tasks.chains.opera.OperaIngesterTask import OperaIngesterTask
ingester = OperaIngesterTask(job_context=job_context, sync_dates=["2025-01-01"])
ingester.run()Implementation Details
The OperaIngesterTask inherits from BaseIngesterTask and utilizes HttpClient from etl_lib.pipeline.ingest.utils.HttpClient to perform authenticated calls to the Opera OHIP API:
Key methods:
access_token()— obtains and caches an OAuth token using credentials defined in thesourceconfiguration._get_raw_reservations(property_id, sync_date)— fetches arriving, departing, created and updated reservations for the specified property/date; de-duplicates and writes to ingest._get_raw_rates(res_list, property_id, sync_date)— fetches detailed rates via a dedicatedrateInfoendpoint and persists daily rate structures._get_raw_profiles(res_list, property_id, sync_date)— fetches guest profile data for the list of related reservations and persists asguest_profiles.
Concurrency
_get_raw_rates and _get_raw_profiles use ThreadPoolExecutor to parallelize external HTTP requests per reservation id.
Example: Fetching rates
for res in res_list:
response = client.fetch(endpoint=f"/rsv/v1/hotels/{property_id}/reservations/rateInfo", extra_params={"id": res_id})
self.rates_raw[res_id] = response[0] if response else []Back to process documentation: [/processes/tasks/chains/opera/opera-raw-task](/processes/tasks/chains/opera/opera-raw-task)
Last updated on