Skip to Content
DevelopmentGuide to integrating new PMS

Guide to Integrating New PMS

Adding a new Property Management System (PMS) to GuestPulse involves several steps to ensure seamless integration and data synchronization. This guide outlines the necessary steps and considerations for integrating a new PMS.

Steps to Integrate a New PMS

  1. Researching and studying the PMS processes
  2. Setting up the configuration
  3. Mapping the PMS entities to the Schema
  4. Implementing the Ingester
  5. Implementing the Crawler
  6. Implementing the Cleaner
  7. Creating the Job File
  8. Testing the Integration

Researching the PMS

Always start by thoroughly researching the PMS you intend to integrate. Understanding the system’s capabilities, limitations, and data structures is crucial for a successful integration.

Key Areas to Research

  1. API Documentation

    • Authentication methods (OAuth, API keys, tokens)
    • Rate limits and throttling policies
    • Available endpoints and their capabilities
    • Request/response formats (JSON, XML, etc.)
    • Pagination strategies
    • Error handling and response codes
  2. Data Model

    • How the PMS structures reservations
    • Guest/customer data organization
    • Revenue and order item structures
    • Master data (room types, rate codes, market segments)
    • Relationships between entities
    • Unique identifiers and foreign keys
  3. Data Retrieval Patterns

    • Incremental vs. full data sync capabilities
    • Date-based filtering options
    • Batch vs. real-time data access
    • Support for historical data backfills
  4. Multi-Property Support

    • How the PMS handles multiple properties
    • Whether properties share a single API or have separate endpoints
    • Property identification schemes
  5. Business Logic

    • Reservation status definitions (confirmed, cancelled, checked-in, etc.)
    • Cancellation workflows and timing
    • Check-in/check-out processes
    • Revenue allocation methods

Example: Mews Research Summary

For Mews, the research revealed:

  • Uses Connector API with client and access tokens
  • Supports incremental sync via date-based reservation queries
  • Multi-property data accessed through a single API
  • Nested JSON structures requiring denormalization
  • Related entities (customers, order items) retrieved via ID lookups
  • Rate limiting requires chunked requests (max 1000 IDs per request)

Setting up the Configuration

Each PMS requires a configuration file that defines the data source, authentication, properties, and crawler settings.

Configuration File Structure

Create a YAML configuration file in .gp_local_storage/etl-gp-config/ with the following structure:

client: <pms_name> source: system: <PMS Display Name> base_url: "https://api.pms.com" client_name: "Guestpulse" auth: secret_name: "<pms_name>/credentials" properties: - id: <property_id> name: <property_name> currency: <ISO_currency_code> timezone: <timezone> active: true crawler_config: - table: reservations database: etl_gp_raw_<pms_name> property_partition: true incremental: true - table: customers database: etl_gp_raw_<pms_name> property_partition: false incremental: true - table: order_items database: etl_gp_raw_<pms_name> property_partition: true incremental: true # Add other tables as needed

Configuration Parameters

  • client: Identifier for the PMS (lowercase, used in paths)
  • source.system: Display name of the PMS
  • source.base_url: Base URL for API requests
  • source.auth.secret_name: AWS Secrets Manager key for credentials
  • properties: List of properties to sync
    • id: Unique property identifier from the PMS
    • currency: ISO currency code for the property
    • timezone: Property timezone
    • active: Whether to include in sync operations
  • crawler_config: Defines tables to crawl from S3 to Iceberg
    • table: Table name in the catalog
    • database: Glue catalog database name
    • property_partition: Whether data is partitioned by property
    • incremental: Whether to append or replace data

AWS Secrets Manager

Store PMS credentials in AWS Secrets Manager with the key specified in source.auth.secret_name. The secret should contain the necessary authentication tokens or API keys as a JSON object.

Example secret structure:

{ "client_token": "your_client_token", "access_token": "your_access_token" }

Mapping the PMS entities to the Schema

The GuestPulse platform uses a unified schema across all PMS integrations. Your task is to map the PMS-specific fields to our standardized schema.

Core Entities to Map

1. Reservations

Map these key fields:

  • Identifiers: res_id (system-generated), res_id_og (PMS-provided)
  • Guest: guest_id (foreign key to guests table)
  • Dates: check_in_date, check_out_date, booking_date, cancellation_date
  • Status: status (must map to: CONFIRMED, CANCELLED, CHECKED_IN, CHECKED_OUT, NO_SHOW)
  • Classification: channel, source, market_segment, booking_type
  • Guests: guests_adults, guests_children
  • Affiliations: travel_agent, company, group, promo_code
  • Revenue: total_room_net, total_room_rate_net, total_room_fnb_net, total_room_other_net

2. Guests

Map these key fields:

  • Identifiers: guest_id (system-generated), guest_id_og (PMS-provided)
  • Personal Info: first_name, last_name, email, phone_number
  • Address: address_street, address_city, address_state, address_postcode, address_country
  • Demographics: nationality, gender, birth_date
  • Other: passport_number, membership_type

3. Rooms (Daily Room Data)

Map these key fields:

  • Identifiers: room_id (system-generated), res_id, guest_id
  • Dates: room_stay_date, room_check_in_date, room_check_out_date
  • Classification: room_type, room_type_code, room_rate_code
  • Guests: room_guests_adults, room_guests_children
  • Revenue: room_stay_date_rate_net, room_stay_date_total_net, room_stay_date_fnb_net, room_stay_date_other_net
  • Flags: room_stay_date_is_check_in_day, room_stay_date_is_check_out_day, room_stay_date_is_stay_day, room_stay_date_is_stay_night

Common Metadata Fields

All tables must include these fields:

  • _chain_id: The chain identifier
  • property_id: The property identifier
  • currency: ISO currency code
  • created_timestamp: When the record was created in the PMS
  • last_modified_timestamp: When the record was last updated in the PMS
  • synced_timestamp: When the record was ingested into GuestPulse

Status Mapping

Standardize PMS-specific status values to our schema:

Standard StatusDescriptionCommon PMS Mappings
CONFIRMEDReservation is confirmed but not yet checked inConfirmed, Reserved, Booked
CHECKED_INGuest has checked inIn-house, Checked In, Active
CHECKED_OUTGuest has checked outChecked Out, Departed
CANCELLEDReservation was cancelledCancelled, Canceled
NO_SHOWGuest did not show upNo Show, No-Show

Field Type Conversions

  • Dates: Convert to DateType (YYYY-MM-DD)
  • Timestamps: Convert to TimestampType (UTC)
  • Currency: Convert to DoubleType and store the currency code separately
  • Phone/Email: Clean and normalize formats
  • Country: Convert to ISO 3166-1 alpha-2 codes

Implementing the Ingester

The Ingester is responsible for fetching raw data from the PMS API and storing it in S3 as Parquet files.

Creating the Ingester Class

Create a new file: etl_lib/pipeline/ingest/<pms_name>/<PMS>Ingester.py

from etl_lib.job.JobContext import JobContext from etl_lib.pipeline.ingest.BaseIngester import BaseIngester class <PMS>Ingester(BaseIngester): """ Ingester for <PMS> PMS data. Fetches reservations, customers, and related data from <PMS> API. """ def __init__(self, job_context: JobContext, sync_dates: Optional[list[str]] = None): super().__init__(job_context) # Configuration self.sync_dates = sync_dates self._secret_name = self.source.get("auth", {}).get("secret_name") self._base_url = self.source.get("base_url") # Initialize API client credentials = self._get_credentials() self._client = <PMS>APIClient( base_url=self._base_url, **credentials ) def run(self): """Main sync method that orchestrates the data ingestion.""" for property_data in self.properties: self._process_property(property_data) logger.info("Sync complete") def _process_property(self, property_data: dict) -> None: """Process a single property.""" property_id = property_data["id"] dates = self._get_sync_dates(property_id=property_id) for sync_date in dates: self._process_sync_date(property_id=property_id, sync_date=sync_date) def _process_sync_date(self, property_id: str, sync_date: str) -> None: """Process a single sync date for a property.""" # 1. Fetch main entities (e.g., reservations) # 2. Extract related entity IDs # 3. Fetch related entities (e.g., customers, order items) # 4. Write to S3 using _write_entities() pass

Key Components

1. API Client

Create a separate API client class for making requests:

# etl_lib/pipeline/ingest/<pms_name>/<PMS>APIClient.py class <PMS>APIClient: def __init__(self, base_url: str, **credentials): self.base_url = base_url self.session = requests.Session() # Set up authentication headers def fetch_reservations(self, property_id: str, start_date: str, end_date: str): """Fetch reservations for a date range.""" pass def fetch_customers(self, customer_ids: list[str]): """Fetch customers by ID.""" pass

2. Schema Definitions

Define Spark schemas for each entity:

# etl_lib/pipeline/ingest/<pms_name>/schema.py from pyspark.sql.types import * RESERVATIONS_SCHEMA = StructType([ StructField("id", StringType(), False), StructField("customer_id", StringType(), True), StructField("check_in_date", StringType(), True), # ... more fields ]) CUSTOMERS_SCHEMA = StructType([ StructField("id", StringType(), False), StructField("first_name", StringType(), True), # ... more fields ])

3. Writing Entities to S3

Use the _write_entities method to save data:

def _write_entities( self, entities: list, schema: StructType, table: str, sync_date: str, property_id: str, *, property_partition: bool, property_column: Optional[str] = None, ) -> None: """Write entities to the ingest catalog with standard metadata columns.""" if not entities: logger.info(f"No {table} to write") return # Convert to DataFrame df = self._job_context.spark.createDataFrame(entities, schema) # Add metadata columns df = df.withColumn("synced_date", F.lit(sync_date)) df = df.withColumn("synced_timestamp", F.current_timestamp()) # Write to S3 output_path = self._get_output_path(table, property_id, sync_date, property_partition) df.write.mode("overwrite").parquet(output_path)

4. Incremental Sync Strategy

Use get_next_sync_dates() from BaseIngester to determine which dates to sync:

def _get_sync_dates(self, property_id: str, table: str = "reservations") -> list[str]: """Get dates to sync (backfill or incremental).""" if self.sync_dates: return self.sync_dates # Get dates since last sync return self.get_next_sync_dates(table=table, property_id=property_id)

Best Practices

  • Error Handling: Implement retry logic for API failures
  • Rate Limiting: Respect API rate limits (use chunking, delays)
  • Logging: Log progress, errors, and data volumes
  • Idempotency: Ensure re-running the same date doesn’t cause issues
  • Data Validation: Validate critical fields before writing

Implementing the Crawler

The Crawler reads the raw Parquet files from S3 and loads them into Iceberg tables in the Glue Catalog.

Creating the Crawler Class

Create a new file: etl_lib/pipeline/crawlers/<pms_name>/<PMS>Crawler.py

from typing import Optional from s3path import S3Path from etl_lib.pipeline.crawlers.BaseCrawler import BaseCrawler class <PMS>Crawler(BaseCrawler): """ Crawler for <PMS> PMS data. Crawls data from S3 that was ingested by <PMS>Ingester and loads it into Iceberg tables in the Glue Catalog. Supports incremental crawling based on synced_date partitions. """ def get_files_path(self, table_name: str, property_id: Optional[str] = None) -> S3Path: """ Return S3 path for the given table. Args: table_name: Name of the table (e.g., 'reservations', 'customers') property_id: Property identifier (if property-partitioned) Returns: S3Path to the table data """ if property_id: # Property-specific path return S3Path(f"/etl-gp-raw/ingest/{self.job_context.chain_id}/{table_name}/property_id={property_id}/") else: # Global path (for non-property-partitioned data) return S3Path(f"/etl-gp-raw/ingest/{self.job_context.chain_id}/{table_name}/")

How the Crawler Works

The BaseCrawler provides the core functionality:

  1. Reads the crawler_config from your YAML file
  2. For each table:
    • Determines if it’s property-partitioned
    • Finds all synced_date partitions in S3
    • Creates or updates the Iceberg table
    • Performs incremental append or full replace based on configuration
  3. Handles schema evolution automatically (adds new columns)
  4. Manages partitioning by property_id and synced_timestamp

Incremental vs. Full Reload

  • Incremental (incremental: true): Appends new synced_date partitions
  • Full Reload (incremental: false): Replaces entire table

Property Partitioning

  • Property-partitioned (property_partition: true): Data is organized by property in S3
  • Non-partitioned (property_partition: false): Data is global (e.g., shared customer data across properties)

Optional: Transform Raw Data

Override transform_raw_df() if you need to transform data before writing to Iceberg:

def transform_raw_df(self, table_name: str, property_id: Optional[str], df: DataFrame) -> DataFrame: """Apply table-specific transformations before writing to Iceberg.""" if table_name == "reservations": # Example: Cast date strings to proper date types df = df.withColumn("check_in_date", F.to_date("check_in_date")) return df

Implementing the Cleaner

The Cleaner transforms raw PMS data from Iceberg tables into the standardized GuestPulse schema.

Creating the Cleaner Class

Create a new file: etl_lib/pipeline/cleaning/<pms_name>/<PMS>Cleaner.py

from pyspark.sql import DataFrame from pyspark.sql import functions as F from etl_lib.pipeline.cleaning.BaseCleaner import BaseCleaner from etl_lib.schema import SchemaFields as SF from etl_lib.schema import name_of_field as nof class <PMS>Cleaner(BaseCleaner): """ Cleaner for <PMS> PMS data. Transforms raw <PMS> data from Iceberg tables into standardized clean format following the schema definitions. Handles mapping of <PMS>-specific fields to the common schema used across all PMS systems. """ def __init__(self, job_context, write_to_catalog: bool = False): super().__init__(job_context, write_to_catalog) self.reservations: Optional[DataFrame] = None self.customers: Optional[DataFrame] = None # Add other cached DataFrames self.ran = False def _run(self): """Run all cleaning steps once and cache the results.""" if self.ran: return self.ran = True # Load and cache data self.customers = self._load_customers().checkpoint(eager=True) self.reservations = self._reservations().checkpoint(eager=True) # Load other tables as needed def _load_customers(self) -> DataFrame: """Load and transform customers data.""" df = self._job_context.spark.table( f"glue_catalog.etl_gp_raw_{self._job_context.chain_id}.customers" ) # Transform to schema return df.select( F.col("id").alias(nof(SF.GUEST_ID_OG)), F.col("first_name").alias(nof(SF.FIRST_NAME)), F.col("last_name").alias(nof(SF.LAST_NAME)), # ... more fields ) def _reservations(self) -> DataFrame: """Clean and transform reservations data.""" df = self._job_context.spark.table( f"glue_catalog.etl_gp_raw_{self._job_context.chain_id}.reservations" ) # Transform to schema df = df.select( F.col("id").alias(nof(SF.RES_ID_OG)), F.col("customer_id").alias(nof(SF.GUEST_ID_OG)), # ... more fields ) # Map status values df = self._map_status(df, { "Confirmed": "CONFIRMED", "CheckedIn": "CHECKED_IN", "CheckedOut": "CHECKED_OUT", "Canceled": "CANCELLED", }) # Generate system IDs df = df.withColumn( nof(SF.RES_ID), F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.RES_ID_OG))) ) # Join with customers to get guest_id df = df.join( self.customers.select(nof(SF.GUEST_ID_OG), nof(SF.GUEST_ID)), on=nof(SF.GUEST_ID_OG), how="left" ) return df def get_clean_reservations(self) -> DataFrame: """Get clean reservations data.""" self._run() return self.reservations def get_clean_guests(self) -> DataFrame: """Get clean guests data.""" self._run() # Generate guest_id guests = self.customers.withColumn( nof(SF.GUEST_ID), F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.GUEST_ID_OG))) ) # Apply data cleaning utilities guests = self.clean_email(guests) guests = self.clean_country(guests) guests = self.clean_phone_number(guests) return guests def get_clean_rooms(self) -> DataFrame: """Get clean daily rooms data.""" self._run() # Generate room_id and explode reservation by stay dates # This transforms reservation-level data to daily room records rooms = self.reservations.withColumn( "stay_dates", F.expr("sequence(check_in_date, date_sub(check_out_date, 1))") ) rooms = rooms.withColumn("stay_date", F.explode("stay_dates")) rooms = rooms.withColumn( nof(SF.ROOM_ID), F.concat_ws("_", F.col(nof(SF.RES_ID)), F.col("stay_date")) ) # Add room-specific fields rooms = rooms.withColumn( nof(SF.ROOM_STAY_DATE_IS_CHECK_IN_DAY), F.when(F.col("stay_date") == F.col(nof(SF.CHECK_IN_DATE)), 1).otherwise(0) ) rooms = rooms.withColumn( nof(SF.ROOM_STAY_DATE_IS_CHECK_OUT_DAY), F.when(F.col("stay_date") == F.date_sub(F.col(nof(SF.CHECK_OUT_DATE)), 1), 1).otherwise(0) ) # Calculate daily revenue allocation rooms = rooms.withColumn( nof(SF.ROOM_LENGTH_OF_STAY), F.datediff(F.col(nof(SF.CHECK_OUT_DATE)), F.col(nof(SF.CHECK_IN_DATE))) ) rooms = rooms.withColumn( nof(SF.ROOM_STAY_DATE_RATE_NET), F.col(nof(SF.TOTAL_ROOM_RATE_NET)) / F.col(nof(SF.ROOM_LENGTH_OF_STAY)) ) return rooms

Key Cleaning Operations

1. Field Mapping

Map PMS fields to schema fields using F.col("pms_field").alias(nof(SF.SCHEMA_FIELD)).

2. Status Normalization

Use _map_status() to convert PMS-specific status values to standard values.

3. ID Generation

Generate system IDs by combining property_id and PMS-provided IDs:

F.concat_ws("_", F.col(nof(SF.PROPERTY_ID)), F.col(nof(SF.RES_ID_OG)))

4. Data Cleaning Utilities

Use inherited methods from BaseCleaner:

  • clean_email(): Validates and normalizes email addresses
  • clean_country(): Converts to ISO country codes
  • clean_phone_number(): Formats phone numbers
  • clean_string_columns(): Trims and normalizes strings

5. Currency Conversion

Use the built-in currency converter for multi-currency operations:

self._currency_converter.convert(amount, from_currency, to_currency)

6. Date Calculations

Calculate booking windows, stay nights, etc.:

F.datediff(F.col(nof(SF.CHECK_IN_DATE)), F.col(nof(SF.BOOKING_DATE))).alias(nof(SF.BOOKING_WINDOW))

7. Join Operations

Join cleaned tables to enrich data:

df = df.join(self.customers, on=nof(SF.GUEST_ID_OG), how="left")

Room Data Generation

Room data is typically generated by:

  1. Exploding reservations by stay dates
  2. Allocating revenue across nights
  3. Adding daily flags (check-in day, check-out day, etc.)

Creating the Job File

Create a job file that orchestrates the entire ETL workflow.

Job File Structure

Create a new file: jobs/<chain_id>.py

from etl_lib.job.JobContext import JobContext from etl_lib.workflow.Workflow import Workflow from etl_lib.data.BigQuery import BigQuery from etl_lib.pipeline.ingest.<pms_name>.<PMS>Ingester import <PMS>Ingester from etl_lib.pipeline.crawlers.<pms_name>.<PMS>Crawler import <PMS>Crawler from etl_lib.pipeline.cleaning.<pms_name>.<PMS>Cleaner import <PMS>Cleaner # Initialize job context job_context = JobContext(chain_id="<chain_id>", partitions=16) db_sink = BigQuery(job_context) # Step 1: Ingest raw data from PMS API to S3 <PMS>Ingester(job_context).run() # Step 2: Crawl data from S3 into Iceberg tables <PMS>Crawler(job_context).run() # Step 3: Clean and transform data cleaner = <PMS>Cleaner(job_context) # Step 4: Run the full workflow Workflow.from_cleaner( job_context=job_context, cleaning=cleaner, db_sink=db_sink, write_to_catalog=True, ).run()

Workflow Options

The Workflow class provides several configuration options:

Skip Processes

Skip specific processing steps:

Workflow.from_cleaner( job_context=job_context, cleaning=cleaner, db_sink=db_sink, write_to_catalog=True, skip_processes=["GuestMatching", "RevenueAllocation"], ).run()

Insert Custom Processes

Add custom processing steps:

from etl_lib.pipeline.processing.custom.<chain_id>.CustomProcess import CustomProcess Workflow.from_cleaner( job_context=job_context, cleaning=cleaner, db_sink=db_sink, write_to_catalog=True, ).insert_process(process=CustomProcess, before="GuestMatching").run()

Date-Specific Processing

Process specific date ranges:

from datetime import date, timedelta start_date = date.today() - timedelta(days=30) end_date = date.today() sync_dates = [(start_date + timedelta(days=i)).isoformat() for i in range((end_date - start_date).days + 1)] <PMS>Ingester(job_context, sync_dates=sync_dates).run()

Testing the Integration

Create a test file to validate your integration.

Test File Structure

Create a new file: jobs/test_<pms_name>.py

import logging from datetime import date, timedelta from etl_lib.data.BigQuery import BigQuery from etl_lib.job.JobContext import JobContext from etl_lib.pipeline.cleaning.<pms_name>.<PMS>Cleaner import <PMS>Cleaner from etl_lib.pipeline.crawlers.<pms_name>.<PMS>Crawler import <PMS>Crawler from etl_lib.pipeline.ingest.<pms_name>.<PMS>Ingester import <PMS>Ingester from etl_lib.workflow.Workflow import Workflow logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) job_context = JobContext(chain_id="<chain_id>", partitions=16) db_sink = BigQuery(job_context) # Test with a small date range start_date = date.today() end_date = start_date - timedelta(days=7) sync_dates = [(end_date + timedelta(days=i)).isoformat() for i in range((start_date - end_date).days + 1)] # Run ingestion ingester = <PMS>Ingester(job_context, sync_dates=sync_dates) ingester.run() # Run crawler crawler = <PMS>Crawler(job_context) crawler.run() # Run cleaner cleaner = <PMS>Cleaner(job_context) # Run workflow (skip guest matching for faster testing) Workflow.from_cleaner( job_context=job_context, cleaning=cleaner, db_sink=db_sink, write_to_catalog=True, skip_processes=["GuestMatching"], ).run()

Testing Checklist

  • Configuration: Verify YAML configuration is correct
  • Credentials: Test API authentication
  • Ingester: Verify data is written to S3 correctly
    • Check S3 paths and partitioning
    • Validate Parquet file structure
    • Confirm date range coverage
  • Crawler: Verify Iceberg tables are created
    • Check table schema matches expectations
    • Verify incremental loading works
    • Test schema evolution
  • Cleaner: Validate cleaned data
    • Check field mappings are correct
    • Verify status normalization
    • Validate ID generation
    • Test join operations
  • Data Quality:
    • Check for null values in required fields
    • Verify date ranges are logical
    • Validate revenue calculations
    • Test data completeness
  • Performance: Monitor execution time and resource usage
  • Error Handling: Test failure scenarios (API errors, missing data, etc.)

Validation Queries

Run these queries to validate your integration:

# Check reservation counts by status spark.sql(f""" SELECT status, COUNT(*) as count FROM glue_catalog.etl_gp_clean_{chain_id}.reservations GROUP BY status """).show() # Check for null guest_ids spark.sql(f""" SELECT COUNT(*) as null_guest_count FROM glue_catalog.etl_gp_clean_{chain_id}.reservations WHERE guest_id IS NULL """).show() # Verify date ranges spark.sql(f""" SELECT MIN(check_in_date) as min_checkin, MAX(check_in_date) as max_checkin, COUNT(*) as total_reservations FROM glue_catalog.etl_gp_clean_{chain_id}.reservations """).show() # Check room data aggregation spark.sql(f""" SELECT room_stay_date, COUNT(*) as room_count, SUM(room_stay_date_rate_net) as total_revenue FROM glue_catalog.etl_gp_clean_{chain_id}.rooms GROUP BY room_stay_date ORDER BY room_stay_date DESC LIMIT 10 """).show()

Common Pitfalls and Solutions

1. API Rate Limiting

Problem: Hitting API rate limits causes failures.

Solution:

  • Implement exponential backoff with retries
  • Chunk large requests
  • Add delays between requests
  • Monitor rate limit headers

2. Timezone Handling

Problem: Dates/times stored in different timezones causing mismatches.

Solution:

  • Convert all timestamps to UTC during ingestion
  • Store property timezone in configuration
  • Apply timezone conversions during cleaning

3. Missing Data

Problem: Some reservations lack required fields.

Solution:

  • Use .fillna() or F.coalesce() to provide defaults
  • Add null checks before joins
  • Log data quality issues

4. Schema Evolution

Problem: PMS adds new fields, breaking ingestion.

Solution:

  • Define permissive schemas in ingester
  • Use schema evolution in Iceberg tables
  • Test with optional fields

5. Revenue Allocation

Problem: Daily revenue doesn’t sum to total reservation revenue.

Solution:

  • Use precise division: total_revenue / stay_nights
  • Handle single-night stays separately
  • Round only at the final aggregation

6. Duplicate Data

Problem: Re-running ingestion creates duplicate records.

Solution:

  • Use mode("overwrite") when writing to S3
  • Partition by synced_date to isolate runs
  • Implement deduplication in cleaner

Additional Resources

Last updated on