Guide to Integrating New PMS
Adding a new Property Management System (PMS) to GuestPulse involves several steps to ensure seamless integration and data synchronization. This guide outlines the necessary steps and considerations for integrating a new PMS.
Steps to Integrate a New PMS
- Researching and studying the PMS processes
- Setting up the configuration
- Mapping the PMS entities to the Schema
- Implementing the Ingester
- Implementing the Crawler
- Implementing the Cleaner
- Creating the Job File
- Testing the Integration
Researching the PMS
Always start by thoroughly researching the PMS you intend to integrate. Understanding the system’s capabilities, limitations, and data structures is crucial for a successful integration.
Key Areas to Research
-
API Documentation
- Authentication methods (OAuth, API keys, tokens)
- Rate limits and throttling policies
- Available endpoints and their capabilities
- Request/response formats (JSON, XML, etc.)
- Pagination strategies
- Error handling and response codes
-
Data Model
- How the PMS structures reservations
- Guest/customer data organization
- Revenue and order item structures
- Master data (room types, rate codes, market segments)
- Relationships between entities
- Unique identifiers and foreign keys
-
Data Retrieval Patterns
- Incremental vs. full data sync capabilities
- Date-based filtering options
- Batch vs. real-time data access
- Support for historical data backfills
-
Multi-Property Support
- How the PMS handles multiple properties
- Whether properties share a single API or have separate endpoints
- Property identification schemes
-
Business Logic
- Reservation status definitions (confirmed, cancelled, checked-in, etc.)
- Cancellation workflows and timing
- Check-in/check-out processes
- Revenue allocation methods
Example: Mews Research Summary
For Mews, the research revealed:
- Uses Connector API with client and access tokens
- Supports incremental sync via date-based reservation queries
- Multi-property data accessed through a single API
- Nested JSON structures requiring denormalization
- Related entities (customers, order items) retrieved via ID lookups
- Rate limiting requires chunked requests (max 1000 IDs per request)
Setting up the Configuration
Each PMS requires a configuration file that defines the data source, authentication, properties, and crawler settings.
Configuration File Structure
Create a YAML configuration file in .gp_local_storage/etl-gp-config/ with the following structure:
client: <pms_name>
source:
system: <PMS Display Name>
base_url: "https://api.pms.com"
client_name: "Guestpulse"
auth:
secret_name: "<pms_name>/credentials"
properties:
- id: <property_id>
name: <property_name>
currency: <ISO_currency_code>
timezone: <timezone>
active: true
crawler_config:
- table: reservations
database: etl_gp_raw_<pms_name>
property_partition: true
incremental: true
- table: customers
database: etl_gp_raw_<pms_name>
property_partition: false
incremental: true
- table: order_items
database: etl_gp_raw_<pms_name>
property_partition: true
incremental: true
# Add other tables as neededConfiguration Parameters
- client: Identifier for the PMS (lowercase, used in paths)
- source.system: Display name of the PMS
- source.base_url: Base URL for API requests
- source.auth.secret_name: AWS Secrets Manager key for credentials
- properties: List of properties to sync
- id: Unique property identifier from the PMS
- currency: ISO currency code for the property
- timezone: Property timezone
- active: Whether to include in sync operations
- crawler_config: Defines tables to crawl from S3 to Iceberg
- table: Table name in the catalog
- database: Glue catalog database name
- property_partition: Whether data is partitioned by property
- incremental: Whether to append or replace data
AWS Secrets Manager
Store PMS credentials in AWS Secrets Manager with the key specified in source.auth.secret_name. The secret should contain the necessary authentication tokens or API keys as a JSON object.
Example secret structure:
{
"client_token": "your_client_token",
"access_token": "your_access_token"
}Mapping the PMS entities to the Schema
The GuestPulse platform uses a unified schema across all PMS integrations. Your task is to map the PMS-specific fields to our standardized schema.
Core Entities to Map
1. Reservations
Map these key fields:
- Identifiers:
res_id(system-generated),res_id_og(PMS-provided) - Guest:
guest_id(foreign key to guests table) - Dates:
check_in_date,check_out_date,booking_date,cancellation_date - Status:
status(must map to: CONFIRMED, CANCELLED, CHECKED_IN, CHECKED_OUT, NO_SHOW) - Classification:
channel,source,market_segment,booking_type - Guests:
guests_adults,guests_children - Affiliations:
travel_agent,company,group,promo_code - Revenue:
total_room_net,total_room_rate_net,total_room_fnb_net,total_room_other_net
2. Guests
Map these key fields:
- Identifiers:
guest_id(system-generated),guest_id_og(PMS-provided) - Personal Info:
first_name,last_name,email,phone_number - Address:
address_street,address_city,address_state,address_postcode,address_country - Demographics:
nationality,gender,birth_date - Other:
passport_number,membership_type
3. Rooms (Daily Room Data)
Map these key fields:
- Identifiers:
room_id(system-generated),res_id,guest_id - Dates:
room_stay_date,room_check_in_date,room_check_out_date - Classification:
room_type,room_type_code,room_rate_code - Guests:
room_guests_adults,room_guests_children - Revenue:
room_stay_date_rate_net,room_stay_date_total_net,room_stay_date_fnb_net,room_stay_date_other_net - Flags:
room_stay_date_is_check_in_day,room_stay_date_is_check_out_day,room_stay_date_is_stay_day,room_stay_date_is_stay_night
Common Metadata Fields
All tables must include these fields:
_chain_id: The chain identifierproperty_id: The property identifiercurrency: ISO currency codecreated_timestamp: When the record was created in the PMSlast_modified_timestamp: When the record was last updated in the PMSsynced_timestamp: When the record was ingested into GuestPulse
Status Mapping
Standardize PMS-specific status values to our schema:
| Standard Status | Description | Common PMS Mappings |
|---|---|---|
| CONFIRMED | Reservation is confirmed but not yet checked in | Confirmed, Reserved, Booked |
| CHECKED_IN | Guest has checked in | In-house, Checked In, Active |
| CHECKED_OUT | Guest has checked out | Checked Out, Departed |
| CANCELLED | Reservation was cancelled | Cancelled, Canceled |
| NO_SHOW | Guest did not show up | No Show, No-Show |
Field Type Conversions
- Dates: Convert to
DateType(YYYY-MM-DD) - Timestamps: Convert to
TimestampType(UTC) - Currency: Convert to
DoubleTypeand store the currency code separately - Phone/Email: Clean and normalize formats
- Country: Convert to ISO 3166-1 alpha-2 codes
Implementing the Ingester
The Ingester is responsible for fetching raw data from the PMS API and storing it in S3 as Parquet files.
Creating the Ingester Class
Create a new file: etl_lib/pipeline/ingest/<pms_name>/<PMS>Ingester.py
from etl_lib.job.JobContext import JobContext
from etl_lib.pipeline.ingest.BaseIngester import BaseIngester
class <PMS>Ingester(BaseIngester):
"""
Ingester for <PMS> PMS data.
Fetches reservations, customers, and related data from <PMS> API.
"""
def __init__(self, job_context: JobContext, sync_dates: Optional[list[str]] = None):
super().__init__(job_context)
# Configuration
self.sync_dates = sync_dates
self._secret_name = self.source.get("auth", {}).get("secret_name")
self._base_url = self.source.get("base_url")
# Initialize API client
credentials = self._get_credentials()
self._client = <PMS>APIClient(
base_url=self._base_url,
**credentials
)
def run(self):
"""Main sync method that orchestrates the data ingestion."""
for property_data in self.properties:
self._process_property(property_data)
logger.info("Sync complete")
def _process_property(self, property_data: dict) -> None:
"""Process a single property."""
property_id = property_data["id"]
dates = self._get_sync_dates(property_id=property_id)
for sync_date in dates:
self._process_sync_date(property_id=property_id, sync_date=sync_date)
def _process_sync_date(self, property_id: str, sync_date: str) -> None:
"""Process a single sync date for a property."""
# 1. Fetch main entities (e.g., reservations)
# 2. Extract related entity IDs
# 3. Fetch related entities (e.g., customers, order items)
# 4. Write to S3 using _write_entities()
passKey Components
1. API Client
Create a separate API client class for making requests:
# etl_lib/pipeline/ingest/<pms_name>/<PMS>APIClient.py
class <PMS>APIClient:
def __init__(self, base_url: str, **credentials):
self.base_url = base_url
self.session = requests.Session()
# Set up authentication headers
def fetch_reservations(self, property_id: str, start_date: str, end_date: str):
"""Fetch reservations for a date range."""
pass
def fetch_customers(self, customer_ids: list[str]):
"""Fetch customers by ID."""
pass2. Schema Definitions
Define Spark schemas for each entity:
# etl_lib/pipeline/ingest/<pms_name>/schema.py
from pyspark.sql.types import *
RESERVATIONS_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("customer_id", StringType(), True),
StructField("check_in_date", StringType(), True),
# ... more fields
])
CUSTOMERS_SCHEMA = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), True),
# ... more fields
])3. Writing Entities to S3
Use the _write_entities method to save data:
def _write_entities(
self,
entities: list,
schema: StructType,
table: str,
sync_date: str,
property_id: str,
*,
property_partition: bool,
property_column: Optional[str] = None,
) -> None:
"""Write entities to the ingest catalog with standard metadata columns."""
if not entities:
logger.info(f"No {table} to write")
return
# Convert to DataFrame
df = self._job_context.spark.createDataFrame(entities, schema)
# Add metadata columns
df = df.withColumn("synced_date", F.lit(sync_date))
df = df.withColumn("synced_timestamp", F.current_timestamp())
# Write to S3
output_path = self._get_output_path(table, property_id, sync_date, property_partition)
df.write.mode("overwrite").parquet(output_path)4. Incremental Sync Strategy
Use get_next_sync_dates() from BaseIngester to determine which dates to sync:
def _get_sync_dates(self, property_id: str, table: str = "reservations") -> list[str]:
"""Get dates to sync (backfill or incremental)."""
if self.sync_dates:
return self.sync_dates
# Get dates since last sync
return self.get_next_sync_dates(table=table, property_id=property_id)Best Practices
- Error Handling: Implement retry logic for API failures
- Rate Limiting: Respect API rate limits (use chunking, delays)
- Logging: Log progress, errors, and data volumes
- Idempotency: Ensure re-running the same date doesn’t cause issues
- Data Validation: Validate critical fields before writing
Implementing the Crawler
The Crawler reads the raw Parquet files from S3 and loads them into Iceberg tables in the Glue Catalog.
Creating the Crawler Class
Create a new file: etl_lib/pipeline/crawlers/<pms_name>/<PMS>Crawler.py
from typing import Optional
from s3path import S3Path
from etl_lib.pipeline.crawlers.BaseCrawler import BaseCrawler
class <PMS>Crawler(BaseCrawler):
"""
Crawler for <PMS> PMS data.
Crawls data from S3 that was ingested by <PMS>Ingester and loads it into
Iceberg tables in the Glue Catalog. Supports incremental crawling based
on synced_date partitions.
"""
def get_files_path(self, table_name: str, property_id: Optional[str] = None) -> S3Path:
"""
Return S3 path for the given table.
Args:
table_name: Name of the table (e.g., 'reservations', 'customers')
property_id: Property identifier (if property-partitioned)
Returns:
S3Path to the table data
"""
if property_id:
# Property-specific path
return S3Path(f"/etl-gp-raw/ingest/{self.job_context.chain_id}/{table_name}/property_id={property_id}/")
else:
# Global path (for non-property-partitioned data)
return S3Path(f"/etl-gp-raw/ingest/{self.job_context.chain_id}/{table_name}/")How the Crawler Works
The BaseCrawler provides the core functionality:
- Reads the
crawler_configfrom your YAML file - For each table:
- Determines if it’s property-partitioned
- Finds all
synced_datepartitions in S3 - Creates or updates the Iceberg table
- Performs incremental append or full replace based on configuration
- Handles schema evolution automatically (adds new columns)
- Manages partitioning by property_id and synced_timestamp
Incremental vs. Full Reload
- Incremental (
incremental: true): Appends new synced_date partitions - Full Reload (
incremental: false): Replaces entire table
Property Partitioning
- Property-partitioned (
property_partition: true): Data is organized by property in S3 - Non-partitioned (
property_partition: false): Data is global (e.g., shared customer data across properties)
Optional: Transform Raw Data
Override transform_raw_df() if you need to transform data before writing to Iceberg:
def transform_raw_df(self, table_name: str, property_id: Optional[str], df: DataFrame) -> DataFrame:
"""Apply table-specific transformations before writing to Iceberg."""
if table_name == "reservations":
# Example: Cast date strings to proper date types
df = df.withColumn("check_in_date", F.to_date("check_in_date"))
return dfImplementing the Cleaner
The Cleaner transforms raw PMS data from Iceberg tables into the standardized GuestPulse schema.
Creating the Cleaner Class
Create a new file: etl_lib/pipeline/cleaning/<pms_name>/<PMS>Cleaner.py
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from etl_lib.pipeline.cleaning.BaseCleaner import BaseCleaner
from etl_lib.schema import SchemaFields as SF
from etl_lib.schema import name_of_field as nof
class <PMS>Cleaner(BaseCleaner):
"""
Cleaner for <PMS> PMS data.
Transforms raw <PMS> data from Iceberg tables into standardized clean format
following the schema definitions. Handles mapping of <PMS>-specific fields
to the common schema used across all PMS systems.
"""
def __init__(self, job_context, write_to_catalog: bool = False):
super().__init__(job_context, write_to_catalog)
self.reservations: Optional[DataFrame] = None
self.customers: Optional[DataFrame] = None
# Add other cached DataFrames
self.ran = False
def _run(self):
"""Run all cleaning steps once and cache the results."""
if self.ran:
return
self.ran = True
# Load and cache data
self.customers = self._load_customers().checkpoint(eager=True)
self.reservations = self._reservations().checkpoint(eager=True)
# Load other tables as needed
def _load_customers(self) -> DataFrame:
"""Load and transform customers data."""
df = self._job_context.spark.table(
f"glue_catalog.etl_gp_raw_{self._job_context.chain_id}.customers"
)
# Transform to schema
return df.select(
F.col("id").alias(nof(SF.GUEST_ID_OG)),
F.col("first_name").alias(nof(SF.FIRST_NAME)),
F.col("last_name").alias(nof(SF.LAST_NAME)),
# ... more fields
)
def _reservations(self) -> DataFrame:
"""Clean and transform reservations data."""
df = self._job_context.spark.table(
f"glue_catalog.etl_gp_raw_{self._job_context.chain_id}.reservations"
)
# Transform to schema
df = df.select(
F.col("id").alias(nof(SF.RES_ID_OG)),
F.col("customer_id").alias(nof(SF.GUEST_ID_OG)),
# ... more fields
)
# Map status values
df = self._map_status(df, {
"Confirmed": "CONFIRMED",
"CheckedIn": "CHECKED_IN",
"CheckedOut": "CHECKED_OUT",
"Canceled": "CANCELLED",
})
# Generate system IDs
df = df.withColumn(
nof(SF.RES_ID),
F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.RES_ID_OG)))
)
# Join with customers to get guest_id
df = df.join(
self.customers.select(nof(SF.GUEST_ID_OG), nof(SF.GUEST_ID)),
on=nof(SF.GUEST_ID_OG),
how="left"
)
return df
def get_clean_reservations(self) -> DataFrame:
"""Get clean reservations data."""
self._run()
return self.reservations
def get_clean_guests(self) -> DataFrame:
"""Get clean guests data."""
self._run()
# Generate guest_id
guests = self.customers.withColumn(
nof(SF.GUEST_ID),
F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.GUEST_ID_OG)))
)
# Apply data cleaning utilities
guests = self.clean_email(guests)
guests = self.clean_country(guests)
guests = self.clean_phone_number(guests)
return guests
def get_clean_rooms(self) -> DataFrame:
"""Get clean daily rooms data."""
self._run()
# Generate room_id and explode reservation by stay dates
# This transforms reservation-level data to daily room records
rooms = self.reservations.withColumn(
"stay_dates",
F.expr("sequence(check_in_date, date_sub(check_out_date, 1))")
)
rooms = rooms.withColumn("stay_date", F.explode("stay_dates"))
rooms = rooms.withColumn(
nof(SF.ROOM_ID),
F.concat_ws("_", F.col(nof(SF.RES_ID)), F.col("stay_date"))
)
# Add room-specific fields
rooms = rooms.withColumn(
nof(SF.ROOM_STAY_DATE_IS_CHECK_IN_DAY),
F.when(F.col("stay_date") == F.col(nof(SF.CHECK_IN_DATE)), 1).otherwise(0)
)
rooms = rooms.withColumn(
nof(SF.ROOM_STAY_DATE_IS_CHECK_OUT_DAY),
F.when(F.col("stay_date") == F.date_sub(F.col(nof(SF.CHECK_OUT_DATE)), 1), 1).otherwise(0)
)
# Calculate daily revenue allocation
rooms = rooms.withColumn(
nof(SF.ROOM_LENGTH_OF_STAY),
F.datediff(F.col(nof(SF.CHECK_OUT_DATE)), F.col(nof(SF.CHECK_IN_DATE)))
)
rooms = rooms.withColumn(
nof(SF.ROOM_STAY_DATE_RATE_NET),
F.col(nof(SF.TOTAL_ROOM_RATE_NET)) / F.col(nof(SF.ROOM_LENGTH_OF_STAY))
)
return roomsKey Cleaning Operations
1. Field Mapping
Map PMS fields to schema fields using F.col("pms_field").alias(nof(SF.SCHEMA_FIELD)).
2. Status Normalization
Use _map_status() to convert PMS-specific status values to standard values.
3. ID Generation
Generate system IDs by combining property_id and PMS-provided IDs:
F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.RES_ID_OG)))4. Data Cleaning Utilities
Use inherited methods from BaseCleaner:
clean_email(): Validates and normalizes email addressesclean_country(): Converts to ISO country codesclean_phone_number(): Formats phone numbersclean_string_columns(): Trims and normalizes strings
5. Currency Conversion
Use the built-in currency converter for multi-currency operations:
self._currency_converter.convert(amount, from_currency, to_currency)6. Date Calculations
Calculate booking windows, stay nights, etc.:
F.datediff(F.col(nof(SF.CHECK_IN_DATE)), F.col(nof(SF.BOOKING_DATE))).alias(nof(SF.BOOKING_WINDOW))7. Join Operations
Join cleaned tables to enrich data:
df = df.join(self.customers, on=nof(SF.GUEST_ID_OG), how="left")Room Data Generation
Room data is typically generated by:
- Exploding reservations by stay dates
- Allocating revenue across nights
- Adding daily flags (check-in day, check-out day, etc.)
Creating the Job File
Create a job file that orchestrates the entire ETL workflow.
Job File Structure
Create a new file: jobs/<chain_id>.py
from etl_lib.job.JobContext import JobContext
from etl_lib.workflow.Workflow import Workflow
from etl_lib.data.BigQuery import BigQuery
from etl_lib.pipeline.ingest.<pms_name>.<PMS>Ingester import <PMS>Ingester
from etl_lib.pipeline.crawlers.<pms_name>.<PMS>Crawler import <PMS>Crawler
from etl_lib.pipeline.cleaning.<pms_name>.<PMS>Cleaner import <PMS>Cleaner
# Initialize job context
job_context = JobContext(chain_id="<chain_id>", partitions=16)
db_sink = BigQuery(job_context)
# Step 1: Ingest raw data from PMS API to S3
<PMS>Ingester(job_context).run()
# Step 2: Crawl data from S3 into Iceberg tables
<PMS>Crawler(job_context).run()
# Step 3: Clean and transform data
cleaner = <PMS>Cleaner(job_context)
# Step 4: Run the full workflow
Workflow.from_cleaner(
job_context=job_context,
cleaning=cleaner,
db_sink=db_sink,
write_to_catalog=True,
).run()Workflow Options
The Workflow class provides several configuration options:
Skip Processes
Skip specific processing steps:
Workflow.from_cleaner(
job_context=job_context,
cleaning=cleaner,
db_sink=db_sink,
write_to_catalog=True,
skip_processes=["GuestMatching", "RevenueAllocation"],
).run()Insert Custom Processes
Add custom processing steps:
from etl_lib.pipeline.processing.custom.<chain_id>.CustomProcess import CustomProcess
Workflow.from_cleaner(
job_context=job_context,
cleaning=cleaner,
db_sink=db_sink,
write_to_catalog=True,
).insert_process(process=CustomProcess, before="GuestMatching").run()Date-Specific Processing
Process specific date ranges:
from datetime import date, timedelta
start_date = date.today() - timedelta(days=30)
end_date = date.today()
sync_dates = [(start_date + timedelta(days=i)).isoformat()
for i in range((end_date - start_date).days + 1)]
<PMS>Ingester(job_context, sync_dates=sync_dates).run()Testing the Integration
Create a test file to validate your integration.
Test File Structure
Create a new file: jobs/test_<pms_name>.py
import logging
from datetime import date, timedelta
from etl_lib.data.BigQuery import BigQuery
from etl_lib.job.JobContext import JobContext
from etl_lib.pipeline.cleaning.<pms_name>.<PMS>Cleaner import <PMS>Cleaner
from etl_lib.pipeline.crawlers.<pms_name>.<PMS>Crawler import <PMS>Crawler
from etl_lib.pipeline.ingest.<pms_name>.<PMS>Ingester import <PMS>Ingester
from etl_lib.workflow.Workflow import Workflow
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
job_context = JobContext(chain_id="<chain_id>", partitions=16)
db_sink = BigQuery(job_context)
# Test with a small date range
start_date = date.today()
end_date = start_date - timedelta(days=7)
sync_dates = [(end_date + timedelta(days=i)).isoformat()
for i in range((start_date - end_date).days + 1)]
# Run ingestion
ingester = <PMS>Ingester(job_context, sync_dates=sync_dates)
ingester.run()
# Run crawler
crawler = <PMS>Crawler(job_context)
crawler.run()
# Run cleaner
cleaner = <PMS>Cleaner(job_context)
# Run workflow (skip guest matching for faster testing)
Workflow.from_cleaner(
job_context=job_context,
cleaning=cleaner,
db_sink=db_sink,
write_to_catalog=True,
skip_processes=["GuestMatching"],
).run()Testing Checklist
- Configuration: Verify YAML configuration is correct
- Credentials: Test API authentication
- Ingester: Verify data is written to S3 correctly
- Check S3 paths and partitioning
- Validate Parquet file structure
- Confirm date range coverage
- Crawler: Verify Iceberg tables are created
- Check table schema matches expectations
- Verify incremental loading works
- Test schema evolution
- Cleaner: Validate cleaned data
- Check field mappings are correct
- Verify status normalization
- Validate ID generation
- Test join operations
- Data Quality:
- Check for null values in required fields
- Verify date ranges are logical
- Validate revenue calculations
- Test data completeness
- Performance: Monitor execution time and resource usage
- Error Handling: Test failure scenarios (API errors, missing data, etc.)
Validation Queries
Run these queries to validate your integration:
# Check reservation counts by status
spark.sql(f"""
SELECT status, COUNT(*) as count
FROM glue_catalog.etl_gp_clean_{chain_id}.reservations
GROUP BY status
""").show()
# Check for null guest_ids
spark.sql(f"""
SELECT COUNT(*) as null_guest_count
FROM glue_catalog.etl_gp_clean_{chain_id}.reservations
WHERE guest_id IS NULL
""").show()
# Verify date ranges
spark.sql(f"""
SELECT
MIN(check_in_date) as min_checkin,
MAX(check_in_date) as max_checkin,
COUNT(*) as total_reservations
FROM glue_catalog.etl_gp_clean_{chain_id}.reservations
""").show()
# Check room data aggregation
spark.sql(f"""
SELECT
room_stay_date,
COUNT(*) as room_count,
SUM(room_stay_date_rate_net) as total_revenue
FROM glue_catalog.etl_gp_clean_{chain_id}.rooms
GROUP BY room_stay_date
ORDER BY room_stay_date DESC
LIMIT 10
""").show()Common Pitfalls and Solutions
1. API Rate Limiting
Problem: Hitting API rate limits causes failures.
Solution:
- Implement exponential backoff with retries
- Chunk large requests
- Add delays between requests
- Monitor rate limit headers
2. Timezone Handling
Problem: Dates/times stored in different timezones causing mismatches.
Solution:
- Convert all timestamps to UTC during ingestion
- Store property timezone in configuration
- Apply timezone conversions during cleaning
3. Missing Data
Problem: Some reservations lack required fields.
Solution:
- Use
.fillna()orF.coalesce()to provide defaults - Add null checks before joins
- Log data quality issues
4. Schema Evolution
Problem: PMS adds new fields, breaking ingestion.
Solution:
- Define permissive schemas in ingester
- Use schema evolution in Iceberg tables
- Test with optional fields
5. Revenue Allocation
Problem: Daily revenue doesn’t sum to total reservation revenue.
Solution:
- Use precise division:
total_revenue / stay_nights - Handle single-night stays separately
- Round only at the final aggregation
6. Duplicate Data
Problem: Re-running ingestion creates duplicate records.
Solution:
- Use
mode("overwrite")when writing to S3 - Partition by
synced_dateto isolate runs - Implement deduplication in cleaner