Replace Values
Replaces values in dataset using a mapping.
Inputs
- Expects one or more of
res_df,rooms_df,guest_dfto be present on the workflow context. The runnable loops over these DataFrames and inspects every column for configured replacements.
Configuration
- Mappings are provided through the job configuration via
get_value_replacements_for_column(column)(the exact config key lives in the job context). The mapping should be a dictionary mapping original values to replacement values (old_value -> new_value).
Behaviour
- For each target DataFrame and for each column that has a replacement mapping:
- The original value is preserved in a new column named
<column>_og. - A small mapping DataFrame is built in Spark from the mapping dict and left-joined to the target DataFrame on the column value.
- The column is replaced using
coalesce(new_val, column)so unmapped values are preserved.
- The original value is preserved in a new column named
- After processing all columns in a DataFrame the code checkpoints the DataFrame eagerly and writes it back to the workflow context under the same attribute name.
Outputs
- Updated DataFrames on the workflow context with
_ogcolumns for each replaced field and values updated according to the provided mapping.
Edge cases
- If a target DataFrame is not present on the context it is skipped.
- If no mapping exists for a given column it is left unchanged (no
_ogcolumn created). - The implementation uses a join-based approach which is robust for many mappings but may be less efficient for extremely large mapping dictionaries — consider a broadcast join or a UDF for very large mappings if performance becomes an issue.
Example (conceptual)
- If the job config contains a mapping for
status: {"Cancelled": "cancelled", "No Show": "no_show"}then running Replace Values will produce astatus_ogcolumn with the original values and replacestatusvalues according to the mapping.
Last updated on