Skip to Content
DevelopmentReferenceTasksProcessingReplaceValuesTask — Developer Reference

ReplaceValuesTask — Developer Reference

Developer-focused examples and implementation details for ReplaceValuesTask.

Configuration Example

config = { "value_replacements": { "channel": { "OTA": "Online Travel Agency", "GDS": "Global Distribution System", "Direct": "Direct Booking" }, "market_segment": { "CORP": "Corporate", "LEISURE": "Leisure", "GROUP": "Group" }, "room_type": { "STD": "Standard", "DLX": "Deluxe", "STE": "Suite" } } }

Implementation Details

Replacement Logic

for model, model_instance in self.inputs.items(): df = self.get_df_from_input(model) for column in df.columns: values = self.job_context.config.get_value_replacements_for_column(column) if not values: continue # Convert mapping to DataFrame mapping_df = df.sparkSession.createDataFrame( [(k, v) for k, v in values.items()], ["old_val", "new_val"] ) # Preserve original and replace df = ( df.withColumn(f"{column}_og", F.col(column)) .join(mapping_df, df[column] == mapping_df.old_val, "left") .withColumn(column, F.coalesce(F.col("new_val"), F.col(column))) .drop("old_val", "new_val") ) self.write_to_output(model, df) ## Incremental Mode In incremental mode, the task will: - Operate on `ProcessedAddedGuestModel`, `ProcessedAddedReservationModel`, and `ProcessedAddedRoomModel` where present and write replaced values back to those Added models. - Merge configured Added models into the base models using merge keys: - Guests: `guest_id` - Reservations: `res_id` - Rooms: (`res_id`, `room_stay_date`)

Example Usage

from etl_lib.tasks.processing.ReplaceValuesTask import ReplaceValuesTask task = ReplaceValuesTask( job_context=job_context, write_to_catalog=False ) task.run()

Back to process documentation: /processes/tasks/processing/replace-values-task

Last updated on