AthenaeumLegacyMergeTask — Developer Reference
Developer reference for the AthenaeumLegacyMergeTask, containing examples and implementation details for merging legacy data into processed models.
Example Usage
from etl_lib.tasks.chains.athenaeum.AthenaeumLegacyMergeTask import AthenaeumLegacyMergeTask
task = AthenaeumLegacyMergeTask(job_context=job_context)
task.run()Implementation Notes
- Merge legacy data with remote processed models
- Handle mapping, duplicates and schema evolution
Schema Alignment (Example)
def _align_legacy_to_schema(self, legacy_df, target_schema, passthrough=None):
"""
Aligns legacy DataFrame to the target processed schema.
"""
# Apply column mappings
for old_col, new_col in self.column_mapping.items():
if old_col in legacy_df.columns:
legacy_df = legacy_df.withColumnRenamed(old_col, new_col)
# Add missing columns with nulls
for col_name, data_type in target_schema.items():
if col_name not in legacy_df.columns:
legacy_df = legacy_df.withColumn(col_name, F.lit(None).cast(data_type))
# Select only the target schema columns
legacy_df = legacy_df.select(*list(target_schema.keys()))
return legacy_dfBack to process documentation: /processes/tasks/chains/athenaeum/athenaeum-legacy-merge-task
Stay Flag Handling (Example)
# Set stay-day flag
legacy_rooms_df = legacy_rooms_df.withColumn(
"room_stay_date_is_stay_day",
F.when(
F.col("booking_type") != BookingTypes.ROOM.value, 0
).when(
F.col("status").isin("cancelled", "no_show", "waitlist"), 0
).when(
F.col("status").isin("checked_in", "checked_out", "confirmed"), 1
).otherwise(0)
)
# Set stay-night flag
legacy_rooms_df = legacy_rooms_df.withColumn(
"room_stay_date_is_stay_night",
F.when(
(F.col("room_stay_date_is_stay_day") == 1) &
(F.col("room_stay_date_is_check_out_day") == 0),
1
).otherwise(0)
)Last updated on