ReplaceValuesTask — Developer Reference
Developer-focused examples and implementation details for ReplaceValuesTask.
Configuration Example
config = {
"value_replacements": {
"channel": {
"OTA": "Online Travel Agency",
"GDS": "Global Distribution System",
"Direct": "Direct Booking"
},
"market_segment": {
"CORP": "Corporate",
"LEISURE": "Leisure",
"GROUP": "Group"
},
"room_type": {
"STD": "Standard",
"DLX": "Deluxe",
"STE": "Suite"
}
}
}Implementation Details
Replacement Logic
for model, model_instance in self.inputs.items():
df = self.get_df_from_input(model)
for column in df.columns:
values = self.job_context.config.get_value_replacements_for_column(column)
if not values:
continue
# Convert mapping to DataFrame
mapping_df = df.sparkSession.createDataFrame(
[(k, v) for k, v in values.items()],
["old_val", "new_val"]
)
# Preserve original and replace
df = (
df.withColumn(f"{column}_og", F.col(column))
.join(mapping_df, df[column] == mapping_df.old_val, "left")
.withColumn(column, F.coalesce(F.col("new_val"), F.col(column)))
.drop("old_val", "new_val")
)
self.write_to_output(model, df)
## Incremental Mode
In incremental mode, the task will:
- Operate on `ProcessedAddedGuestModel`, `ProcessedAddedReservationModel`, and `ProcessedAddedRoomModel` where present and write replaced values back to those Added models.
- Merge configured Added models into the base models using merge keys:
- Guests: `guest_id`
- Reservations: `res_id`
- Rooms: (`res_id`, `room_stay_date`)Example Usage
from etl_lib.tasks.processing.ReplaceValuesTask import ReplaceValuesTask
task = ReplaceValuesTask(
job_context=job_context,
write_to_catalog=False
)
task.run()Back to process documentation: /processes/tasks/processing/replace-values-task
Last updated on