Cleaning
The cleaning stage transforms raw data from the ingest stage into a standardized format using our common schema. This is a critical step that ensures data consistency across different Property Management Systems (PMS) and enables unified analytics and reporting.
Overview
After ingesting raw data from various PMS vendors (Mews, Opera, Athenaeum, Guestline), the cleaning pipeline:
- Maps vendor-specific fields to our standardized schema
- Validates and cleans data (emails, phone numbers, addresses)
- Normalizes data types and formats
- Deduplicates records based on vendor-specific logic
- Enriches data with derived fields and UUIDs
- Applies vendor-specific transformations as needed
Architecture
BaseCleaner
All PMS-specific cleaners inherit from BaseCleaner, which provides:
- Common cleaning utilities (email validation, phone formatting, country codes)
- Currency conversion using ECB exchange rates
- Property mapping from configuration
- Status normalization across different PMS systems
- Abstract methods that each cleaner must implement
class BaseCleaner(ABC):
@abstractmethod
def get_clean_reservations(self) -> DataFrame
@abstractmethod
def get_clean_guests(self) -> DataFrame
@abstractmethod
def get_clean_rooms(self) -> DataFrameVendor-Specific Cleaners
Each PMS vendor has its own cleaner implementation:
- Athenaeum - AthenaeumCleaner
- Mews - MewsCleaner
- Opera - OperaCleaner (Cheval Collection)
- Guestline - GuestlineCleaner (Queensway)
Common Operations
Deduplication Strategy
Each vendor requires different deduplication logic based on:
- Last modified timestamp - Most recent update wins
- Status priority - Checked-out > Checked-in > Confirmed > Other
- Unique identifiers - Vendor-specific reservation/confirmation numbers
# Example: Priority-based deduplication
status_priority = (
F.when(F.col("status") == "CHKOUT", 3)
.when(F.col("status") == "CHKIN", 2)
.when(F.col("status") == "CONFIRMED", 1)
.otherwise(0)
)
window = Window.partitionBy("reservation_id").orderBy(
F.col("updated_date").desc(),
status_priority.desc()
)
df = df.withColumn("row_num", F.row_number().over(window))
df = df.filter(F.col("row_num") == 1).drop("row_num")Field Mapping
Raw vendor fields are mapped to our standardized schema:
df = (
df.withColumnRenamed("confirmationnumber", "reservation_id_og")
.withColumnRenamed("arrivaldate", "check_in_date")
.withColumnRenamed("departuredate", "check_out_date")
.withColumnRenamed("emailadd", "email")
# ... more mappings
)Data Type Conversion
Ensures all fields have the correct data type:
df = (
df.withColumn("check_in_date", F.to_date("check_in_date"))
.withColumn("check_out_date", F.to_date("check_out_date"))
.withColumn("created_timestamp", F.to_timestamp("created_timestamp"))
.withColumn("guests_adults", F.col("guests_adults").cast("int"))
)Data Validation & Cleaning
Common cleaning utilities from utils.py:
Email Validation
def clean_email(df: DataFrame, col: str) -> DataFrame:
"""
- Removes invalid characters
- Validates email format
- Normalizes to lowercase ASCII
"""Phone Number Formatting
def clean_phone_number(df: DataFrame, col: str) -> DataFrame:
"""
- Parses phone numbers with phonenumbers library
- Formats to E.164 international standard
- Returns None for invalid numbers
"""Country Code Normalization
def clean_country(df: DataFrame, col: str) -> DataFrame:
"""
- Looks up countries using pycountry
- Returns ISO Alpha-2 country codes (e.g., 'GB', 'US')
"""String Cleaning
def clean_string_columns(df: DataFrame, cols: list[str]) -> DataFrame:
"""
- Removes accents/diacritics using unidecode
- Trims whitespace
- Converts empty strings to NULL
"""Status Normalization
Each vendor uses different status codes. We normalize them to a standard set:
confirmed- Reservation is confirmedchecked_in- Guest has checked inchecked_out- Guest has checked outcancelled- Reservation was cancelledno_show- Guest didn’t show upwaitlist- On waiting listin_house- Currently staying (some vendors)
UUID Generation
Stable UUIDs are generated for:
- Reservations - Unique per reservation
- Guests - Stable per guest_id_og (original vendor guest ID)
# Generate stable UUID per unique guest_id_og
guest_uuid_map = (
df.dropDuplicates(["guest_id_og"])
.withColumn("guest_id", F.expr("uuid()"))
.select("guest_id_og", "guest_id")
)
df = df.join(guest_uuid_map, on="guest_id_og", how="left")Output
The cleaning stage produces three main tables per chain:
Clean Reservations
- Table:
etl_gp_clean_{chain_id}.reservations - Schema: Full reservation details with standardized fields
- Partitioned by:
chain_id,property_id
Clean Guests
- Table:
etl_gp_clean_{chain_id}.guests - Schema: Guest profiles with contact information
- Deduped by:
guest_id_og
Clean Rooms
- Table:
etl_gp_clean_{chain_id}.rooms - Schema: Room inventory and types
- Note: Not all vendors provide room data
Vendor-Specific Documentation
For detailed implementation details, see the vendor-specific pages:
Next Steps
After cleaning, the data proceeds to the processing stage where:
- Guest matching and deduplication occurs across properties
- Stay patterns are analyzed
- Aggregations and metrics are computed
- Final reports are generated