Skip to Content

Cleaning

The cleaning stage transforms raw data from the ingest stage into a standardized format using our common schema. This is a critical step that ensures data consistency across different Property Management Systems (PMS) and enables unified analytics and reporting.

Overview

After ingesting raw data from various PMS vendors (Mews, Opera, Athenaeum, Guestline), the cleaning pipeline:

  1. Maps vendor-specific fields to our standardized schema
  2. Validates and cleans data (emails, phone numbers, addresses)
  3. Normalizes data types and formats
  4. Deduplicates records based on vendor-specific logic
  5. Enriches data with derived fields and UUIDs
  6. Applies vendor-specific transformations as needed

Architecture

BaseCleaner

All PMS-specific cleaners inherit from BaseCleaner, which provides:

  • Common cleaning utilities (email validation, phone formatting, country codes)
  • Currency conversion using ECB exchange rates
  • Property mapping from configuration
  • Status normalization across different PMS systems
  • Abstract methods that each cleaner must implement
class BaseCleaner(ABC): @abstractmethod def get_clean_reservations(self) -> DataFrame @abstractmethod def get_clean_guests(self) -> DataFrame @abstractmethod def get_clean_rooms(self) -> DataFrame

Vendor-Specific Cleaners

Each PMS vendor has its own cleaner implementation:

Common Operations

Deduplication Strategy

Each vendor requires different deduplication logic based on:

  • Last modified timestamp - Most recent update wins
  • Status priority - Checked-out > Checked-in > Confirmed > Other
  • Unique identifiers - Vendor-specific reservation/confirmation numbers
# Example: Priority-based deduplication status_priority = ( F.when(F.col("status") == "CHKOUT", 3) .when(F.col("status") == "CHKIN", 2) .when(F.col("status") == "CONFIRMED", 1) .otherwise(0) ) window = Window.partitionBy("reservation_id").orderBy( F.col("updated_date").desc(), status_priority.desc() ) df = df.withColumn("row_num", F.row_number().over(window)) df = df.filter(F.col("row_num") == 1).drop("row_num")

Field Mapping

Raw vendor fields are mapped to our standardized schema:

df = ( df.withColumnRenamed("confirmationnumber", "reservation_id_og") .withColumnRenamed("arrivaldate", "check_in_date") .withColumnRenamed("departuredate", "check_out_date") .withColumnRenamed("emailadd", "email") # ... more mappings )

Data Type Conversion

Ensures all fields have the correct data type:

df = ( df.withColumn("check_in_date", F.to_date("check_in_date")) .withColumn("check_out_date", F.to_date("check_out_date")) .withColumn("created_timestamp", F.to_timestamp("created_timestamp")) .withColumn("guests_adults", F.col("guests_adults").cast("int")) )

Data Validation & Cleaning

Common cleaning utilities from utils.py:

Email Validation

def clean_email(df: DataFrame, col: str) -> DataFrame: """ - Removes invalid characters - Validates email format - Normalizes to lowercase ASCII """

Phone Number Formatting

def clean_phone_number(df: DataFrame, col: str) -> DataFrame: """ - Parses phone numbers with phonenumbers library - Formats to E.164 international standard - Returns None for invalid numbers """

Country Code Normalization

def clean_country(df: DataFrame, col: str) -> DataFrame: """ - Looks up countries using pycountry - Returns ISO Alpha-2 country codes (e.g., 'GB', 'US') """

String Cleaning

def clean_string_columns(df: DataFrame, cols: list[str]) -> DataFrame: """ - Removes accents/diacritics using unidecode - Trims whitespace - Converts empty strings to NULL """

Status Normalization

Each vendor uses different status codes. We normalize them to a standard set:

  • confirmed - Reservation is confirmed
  • checked_in - Guest has checked in
  • checked_out - Guest has checked out
  • cancelled - Reservation was cancelled
  • no_show - Guest didn’t show up
  • waitlist - On waiting list
  • in_house - Currently staying (some vendors)

UUID Generation

Stable UUIDs are generated for:

  • Reservations - Unique per reservation
  • Guests - Stable per guest_id_og (original vendor guest ID)
# Generate stable UUID per unique guest_id_og guest_uuid_map = ( df.dropDuplicates(["guest_id_og"]) .withColumn("guest_id", F.expr("uuid()")) .select("guest_id_og", "guest_id") ) df = df.join(guest_uuid_map, on="guest_id_og", how="left")

Output

The cleaning stage produces three main tables per chain:

Clean Reservations

  • Table: etl_gp_clean_{chain_id}.reservations
  • Schema: Full reservation details with standardized fields
  • Partitioned by: chain_id, property_id

Clean Guests

  • Table: etl_gp_clean_{chain_id}.guests
  • Schema: Guest profiles with contact information
  • Deduped by: guest_id_og

Clean Rooms

  • Table: etl_gp_clean_{chain_id}.rooms
  • Schema: Room inventory and types
  • Note: Not all vendors provide room data

Vendor-Specific Documentation

For detailed implementation details, see the vendor-specific pages:

Next Steps

After cleaning, the data proceeds to the processing stage where:

  • Guest matching and deduplication occurs across properties
  • Stay patterns are analyzed
  • Aggregations and metrics are computed
  • Final reports are generated
Last updated on