GuestLoyaltyTask — Developer Reference
This developer reference contains the code examples and implementation details for GuestLoyaltyTask.
Example Usage
from etl_lib.tasks.processing.GuestLoyaltyTask import GuestLoyaltyTask
task = GuestLoyaltyTask(
job_context=job_context,
write_to_catalog=True
)
task.run()
## Incremental Example
```python
# Process only new/changed records
task = GuestLoyaltyTask(job_context=job_context, is_incremental=True)
task.run()Incremental Outputs
ProcessedAddedGuestModel— newly added guest records enriched with loyalty metrics.ProcessedAddedReservationModel— newly added reservation records withnth_bookingand related metrics.
## Implementation Details
### Aggregation Strategy
```python
# Group by guest_cluster_id and aggregate metrics
agg_exprs = [
F.count_distinct("res_id").alias("stays"),
F.sum("stay_nights").alias("total_stay_nights"),
F.avg("stay_nights").alias("avg_stay_nights"),
# ... more aggregations
]
df = df.groupBy("guest_cluster_id").agg(*agg_exprs)Handling Optional Columns
if "total_room_net" in df.columns:
agg_exprs.extend([
F.sum("total_room_net").alias("lifetime_total_value"),
F.avg("total_room_net").alias("avg_total_net_revenue_per_stay"),
])Window Functions
df = df.withColumn(
"time_between_bookings",
F.datediff(
"booking_date",
F.lag("booking_date", 1).over(
Window.partitionBy("guest_cluster_id")
.orderBy("booking_date")
),
),
)Last Booked Helper Method
@staticmethod
def add_last_booked(df: DataFrame, df_not_agg: DataFrame,
column: SF, alias: SF) -> DataFrame:
"""Adds the most recent value of a column for each guest."""
if column.value.name in df_not_agg.columns:
last_booked_df = (
df_not_agg.orderBy("booking_date")
.groupBy("guest_cluster_id")
.agg(F.last(column.value.name).alias(alias.value.name))
)
return df.join(last_booked_df, ["guest_cluster_id"], how="left")
return dfBack to process documentation: /processes/tasks/processing/guest-loyalty-task
Last updated on