Skip to Content
DevelopmentReferenceTasksChainsOperaCleanOperaTask — Developer Reference

CleanOperaTask — Developer Reference

Developer reference for CleanOperaTask, the Opera chain cleaner which transforms raw Opera data into the standard Clean models. Contains constructor details, methods, and transformation notes.

Constructor Parameters

ParameterTypeDescription
nameOptional[str]Optional task name
job_contextJobContextJob context with Spark, catalog and config

Provides / Requires

  • requires()[RawReservationModel, RawRateModel, RawProfileModel]
  • provides()[CleanGuestModel, CleanReservationModel, CleanRoomModel]

Example Usage

from etl_lib.tasks.chains.opera.CleanOperaTask import CleanOperaTask cleaner = CleanOperaTask(job_context=job_context) cleaner.run()

Implementation Details

CleanOperaTask inherits from BaseCleanerTask and provides three primary outputs: CleanGuestModel, CleanReservationModel, CleanRoomModel. Key transformation points:

  • Reservation ID mapping: Extract res_id_og from list payloads and create a stable UUID using uuid5 for res_id to ensure consistent downstream joins.
  • De-duplication: Use window functions over (res_id_og) with a priority on lastmodifydatetime and status to select the canonical reservation row.
  • Rates joining: Explode dailyRates.details into room_stay_date rows and join with the reservation rows on (res_id_og, room_stay_date).
  • Guest UUID generation: Generate stable guest_id UUIDs based on guest_id_og and map to reservations.

Example: Rate extraction (simplified)

df = self.get_df_from_input(RawRateModel) df = df.withColumn("detail", F.explode("dailyRates.details")) df = df.select(F.col("reservationId").alias("res_id_og"), F.to_date("detail.summarydate").alias("room_stay_date"), F.col("detail.revenue").cast("double").alias("room_stay_date_rate_net"))

Back to process documentation: [/processes/tasks/chains/opera/clean-opera-task](/processes/tasks/chains/opera/clean-opera-task)

Last updated on