Skip to Content
DevelopmentReferenceTasksChainsAthenaeumAthenaeumLegacyMergeTask — Developer Reference

AthenaeumLegacyMergeTask — Developer Reference

Developer reference for the AthenaeumLegacyMergeTask, containing examples and implementation details for merging legacy data into processed models.

Example Usage

from etl_lib.tasks.chains.athenaeum.AthenaeumLegacyMergeTask import AthenaeumLegacyMergeTask task = AthenaeumLegacyMergeTask(job_context=job_context) task.run()

Implementation Notes

  • Merge legacy data with remote processed models
  • Handle mapping, duplicates and schema evolution

Schema Alignment (Example)

def _align_legacy_to_schema(self, legacy_df, target_schema, passthrough=None): """ Aligns legacy DataFrame to the target processed schema. """ # Apply column mappings for old_col, new_col in self.column_mapping.items(): if old_col in legacy_df.columns: legacy_df = legacy_df.withColumnRenamed(old_col, new_col) # Add missing columns with nulls for col_name, data_type in target_schema.items(): if col_name not in legacy_df.columns: legacy_df = legacy_df.withColumn(col_name, F.lit(None).cast(data_type)) # Select only the target schema columns legacy_df = legacy_df.select(*list(target_schema.keys())) return legacy_df

Back to process documentation: /processes/tasks/chains/athenaeum/athenaeum-legacy-merge-task

Stay Flag Handling (Example)

# Set stay-day flag legacy_rooms_df = legacy_rooms_df.withColumn( "room_stay_date_is_stay_day", F.when( F.col("booking_type") != BookingTypes.ROOM.value, 0 ).when( F.col("status").isin("cancelled", "no_show", "waitlist"), 0 ).when( F.col("status").isin("checked_in", "checked_out", "confirmed"), 1 ).otherwise(0) ) # Set stay-night flag legacy_rooms_df = legacy_rooms_df.withColumn( "room_stay_date_is_stay_night", F.when( (F.col("room_stay_date_is_stay_day") == 1) & (F.col("room_stay_date_is_check_out_day") == 0), 1 ).otherwise(0) )
Last updated on