CleanOperaTask — Developer Reference
Developer reference for CleanOperaTask, the Opera chain cleaner which transforms raw Opera data into the standard Clean models. Contains constructor details, methods, and transformation notes.
Constructor Parameters
| Parameter | Type | Description |
|---|---|---|
name | Optional[str] | Optional task name |
job_context | JobContext | Job context with Spark, catalog and config |
Provides / Requires
requires()—[RawReservationModel, RawRateModel, RawProfileModel]provides()—[CleanGuestModel, CleanReservationModel, CleanRoomModel]
Example Usage
from etl_lib.tasks.chains.opera.CleanOperaTask import CleanOperaTask
cleaner = CleanOperaTask(job_context=job_context)
cleaner.run()Implementation Details
CleanOperaTask inherits from BaseCleanerTask and provides three primary outputs: CleanGuestModel, CleanReservationModel, CleanRoomModel. Key transformation points:
- Reservation ID mapping: Extract
res_id_ogfrom list payloads and create a stable UUID usinguuid5forres_idto ensure consistent downstream joins. - De-duplication: Use window functions over
(res_id_og)with a priority onlastmodifydatetimeandstatusto select the canonical reservation row. - Rates joining: Explode
dailyRates.detailsintoroom_stay_daterows and join with the reservation rows on(res_id_og, room_stay_date). - Guest UUID generation: Generate stable
guest_idUUIDs based onguest_id_ogand map to reservations.
Example: Rate extraction (simplified)
df = self.get_df_from_input(RawRateModel)
df = df.withColumn("detail", F.explode("dailyRates.details"))
df = df.select(F.col("reservationId").alias("res_id_og"), F.to_date("detail.summarydate").alias("room_stay_date"), F.col("detail.revenue").cast("double").alias("room_stay_date_rate_net"))Back to process documentation: [/processes/tasks/chains/opera/clean-opera-task](/processes/tasks/chains/opera/clean-opera-task)
Last updated on