Skip to Content

Mews

The Mews ingest process retrieves reservation, customer, order item (revenue), and master data from Mews PMS via the Connector API and loads it into the ingest catalog.

Overview

The MewsIngester class handles the extraction of multiple data types from the Mews Property Management System:

  • Reservations: Complete booking information including guest details, room assignments, dates, and status
  • Customers: Guest profile data with contact information and addresses
  • Order Items: Detailed revenue/charge data for each reservation
  • Master Data: Services, rates, resource categories (room types), business segments, age categories, companies, and products

Data Source Configuration

The ingester connects to Mews Connector API using access token authentication configured in the job context:

MewsIngester(job_context, sync_dates=None).run()

Connection Details

  • Protocol: HTTPS REST API
  • Authentication: ClientToken + AccessToken (stored in AWS Secrets Manager)
  • Data Format: JSON with paginated responses
  • Base URL: Configured via source["base_url"]
  • API Documentation: Mews Connector API 
  • Required Headers:
    • Content-Type: application/json
  • Authentication Method: Tokens passed in request body payload

Authentication Flow

The ingester retrieves credentials from AWS Secrets Manager:

  1. Secret Lookup: Retrieves secret using source["auth"]["secret_name"]
  2. Credential Extraction: Extracts client_token and access_token from secret
  3. Client Initialization: Creates MewsAPIClient with credentials
  4. Token Injection: Tokens are automatically added to each API request payload

Ingest Process Flow

Reservation Retrieval Strategy

The ingester uses an update-based fetch approach with a buffer window:

Single Endpoint with Buffer

Retrieves all reservations updated within a time window that includes a 1-day buffer to catch late updates.

POST /api/connector/v1/reservations/getAll/2023-06-06 { "EnterpriseIds": ["{propertyId}"], "UpdatedUtc": { "StartUtc": "{sync_date - 1 day}", "EndUtc": "{sync_date + 1 day}" } }

Buffer Strategy: The ingester adds ±1 day to the sync date to ensure no reservations are missed due to timezone differences or late updates.

Pagination Handling

The Mews API uses cursor-based pagination:

  • Page Size: 1000 records per request
  • Cursor: Automatically included in subsequent requests
  • Termination: Pagination stops when no cursor is returned
  • Rate Limiting: 100ms delay between pagination requests

Incremental Loading Strategy

The ingester uses an incremental sync approach:

  1. Check Last Sync Date: Query the ingest catalog for the most recent synced_date for the reservations table per property
  2. Calculate Missing Dates: Determine all dates between last sync and today
  3. Fallback to Constructor Dates: If no previous sync exists, use sync_dates parameter
  4. Fetch Daily Data: Process one date at a time per property
  5. Record Sync Date: Each batch is tagged with its synced_date for future incremental runs

Master Data Synchronization

For each batch of reservations, the ingester extracts related IDs and fetches master data entities:

ID Extraction

From each reservation batch, the ingester extracts:

  • Service IDs: For fetching property services
  • Rate IDs: For fetching rate plans
  • Business Segment IDs: For fetching market segments
  • Company IDs: For fetching corporate accounts
  • Account IDs: For fetching customer profiles
  • Age Category IDs: For fetching guest type classifications
  • Reservation IDs: For fetching order items

Chunked Fetching

Master data is fetched in chunks to respect API limits:

  • Chunk Size: 1000 IDs per request
  • Parallel Processing: Sequential processing of chunks with progress logging
  • Error Handling: Exceptions are logged; empty lists returned on failure

Master Data Entities

Services

  • Purpose: Property services (typically one per property)
  • Endpoint: /api/connector/v1/services/getAll
  • Key Fields: Id, EnterpriseId, Name, IsActive
  • Usage: Maps ServiceId to EnterpriseId (property_id)

Resource Categories

  • Purpose: Room types (Standard, Deluxe, Suite, etc.)
  • Endpoint: /api/connector/v1/resourceCategories/getAll
  • Key Fields: Id, EnterpriseId, ServiceId, Name, ShortName, Capacity
  • Usage: Defines available room types per property

Rates

  • Purpose: Rate plans (BAR, Corporate, Government, etc.)
  • Endpoint: /api/connector/v1/rates/getAll
  • Key Fields: Id, ServiceId, Name, ShortName, IsActive, IsPublic
  • Usage: Identifies rate plan details for reservations

Business Segments

  • Purpose: Market segments (Transient, Group, Corporate, etc.)
  • Endpoint: /api/connector/v1/businessSegments/getAll
  • Key Fields: Id, ServiceId, Name, IsActive
  • Usage: Categorizes booking sources and market types

Age Categories

  • Purpose: Guest type classifications (Adult, Child, Infant)
  • Endpoint: /api/connector/v1/ageCategories/getAll
  • Key Fields: Id, ServiceId, Name, Classification (Adult/Child)
  • Usage: Defines person count categories in PersonCounts array

Transactional Data Synchronization

After master data is loaded, transactional data is fetched:

Companies

  • Purpose: Corporate accounts and travel agencies
  • Endpoint: /api/connector/v1/companies/getAll
  • Key Fields: Id, Name, TaxIdentifier, ContactPerson, Address
  • Linked via: Reservation.PartnerCompanyId, TravelAgencyId

Customers

  • Purpose: Guest profiles with contact information
  • Endpoint: /api/connector/v1/customers/getAll
  • Key Fields: Id, FirstName, LastName, Email, Phone, Address, BirthDate, Nationality
  • Extent: Includes Customers and Addresses, excludes Documents
  • Linked via: Reservation.AccountId

Order Items

  • Purpose: Revenue/charge line items
  • Endpoint: /api/connector/v1/orderItems/getAll
  • Filters: By ServiceOrderIds (reservation IDs)
  • Key Fields: Id, Type, AccountingCategoryId, Amount, ConsumedUtc, State
  • Types: Product, Service, Payment, CancellationFee
  • Return Value: Raw list used for product ID extraction

Products

  • Purpose: F&B items and orderable products
  • Endpoint: /api/connector/v1/products/getAll
  • Key Fields: Id, ServiceId, Name, ShortName, Classifications (Food, Beverage, Wellness, CityTax)
  • Extraction: Product IDs extracted from order items with Type=Product
  • Linked via: OrderItem.Data.Product.ProductId

Reservation Processing

Service-to-Enterprise Mapping

The ingester creates a dictionary mapping ServiceId to EnterpriseId from the services data:

service_map = {service["Id"]: service["EnterpriseId"]}

This mapping is used to populate the property_id column in reservations by:

  1. Creating a PySpark map expression from the dictionary
  2. Mapping Reservation.ServiceId to property_id
  3. Filling nulls with the property_id parameter as fallback

VoucherId Renaming

The field VoucherId is renamed to VoucherCode before saving to avoid conflicts with the BaseCrawler architecture.

Data Schemas

Reservations Schema

The reservation data captures comprehensive booking information:

Field CategoryKey FieldsDescription
IdentifiersId, ServiceId, GroupId, NumberUnique identifiers for reservation and service
AccountsAccountId, AccountType, BookerIdCustomer account references
AuditCreatorProfileId, UpdaterProfileId, CreatedUtc, UpdatedUtc, CancelledUtcCreation and modification tracking
StateState, Origin, CommanderOrigin, OriginDetailsReservation status and booking channel
DatesStartUtc, EndUtc, ScheduledStartUtc, ScheduledEndUtc, ActualStartUtc, ActualEndUtcStay dates (scheduled and actual)
ReleaseReleasedUtcWhen reservation was released
ChannelChannelNumber, ChannelManagerNumberChannel manager booking reference
Room AssignmentRequestedResourceCategoryId, AssignedResourceId, AssignedResourceLockedRoom type request and assignment
Rate & SegmentBusinessSegmentId, RateIdMarket segment and rate plan
VoucherVoucherId (renamed to VoucherCode)Voucher/promo code
PaymentCreditCardIdPayment method reference
BlocksAvailabilityBlockIdGroup block reference
PartnersPartnerCompanyId, TravelAgencyIdCompany and travel agent references
CancellationCancellationReasonReason for cancellation
PurposePurposeTrip purpose
QR CodeQrCodeDataMobile key data
Person CountsPersonCounts[]Array of guest counts by age category
Person Counts FieldsPersonCounts[].AgeCategoryId, PersonCounts[].CountAge category ID and count
OptionsOptionsCheck-in options struct
Options FieldsOptions.OwnerCheckedIn, AllCompanionsCheckedIn, AnyCompanionCheckedIn, ConnectorCheckInCheck-in status flags

Schema Type: StructType with nested arrays and objects (see RESERVATIONS_SCHEMA)

Customers Schema

The customer data contains guest profile information:

Field CategoryKey FieldsDescription
IdentifiersIdUnique customer ID
NameFirstName, LastName, TitleGuest name details
ContactEmail, Phone, SecondaryPhoneContact information
DemographicsBirthDate, BirthPlace, Nationality, Sex, LanguageDemographic data
AddressAddress.Line1, City, PostalCode, CountryCodePhysical address
IdentificationIdentityCard.Type, Number, ExpirationID document details
PassportPassport.Number, Expiration, IssuancePassport information
VisaVisa.Number, Expiration, IssuanceVisa details
CompanyCompanyIdLinked company reference
CategoriesCategoryIdCustomer category classification
NotesNotesAdditional notes
OptionsOptions.SendMarketingEmails, SendMarketingPostalMailMarketing preferences

Schema Type: StructType with nested address and document structures

Order Items Schema

The order item data provides detailed revenue breakdown:

Field CategoryKey FieldsDescription
IdentifiersId, EnterpriseId, ServiceId, OrderIdUnique identifiers
AccountingAccountId, AccountingCategoryIdAccount and category references
TypeType, SubTypeItem type (Product, Service, Payment, etc.)
NameNameItem description
DatesCreatedUtc, UpdatedUtc, ConsumedUtc, ClosedUtcLifecycle timestamps
BillingBillId, InvoiceIdBilling document references
StateState, OriginAccounting state (Open, Closed, etc.)
AmountAmount.Currency, NetValue, GrossValueMonetary amounts
TaxAmount.TaxValues[], TaxValues[].Code, TaxValues[].ValueTax breakdown
Tax BreakdownAmount.Breakdown.Items[]Detailed tax calculation
Breakdown FieldsItems[].TaxRateCode, NetValue, TaxValuePer-rate tax details
DataDataType-specific nested data (varies by Type)
Product DataData.Product.ProductIdProduct reference when Type=Product
OptionsOptions.CanceledWithReservationWhether item was auto-canceled
NotesNotesAdditional notes

Schema Type: Complex StructType with nested amounts, taxes, and variable data field

Services Schema

Service data defines property-level services:

FieldTypeDescription
IdStringUnique service ID
EnterpriseIdStringProperty/enterprise ID
NameStringService name
DescriptionStringService description
IsActiveBooleanWhether service is active
OptionsStructService options and settings

Schema Type: StructType with minimal nesting

Resource Categories Schema

Resource category data defines room types:

FieldTypeDescription
IdStringUnique category ID
EnterpriseIdStringProperty/enterprise ID
ServiceIdStringAssociated service
NameStringCategory name (e.g., “Deluxe King”)
ShortNameStringAbbreviated name
DescriptionStringCategory description
IsActiveBooleanWhether category is active
CapacityIntegerMaximum occupancy
ExtraCapacityIntegerExtra bed capacity
OrderingIntegerDisplay order

Schema Type: StructType with basic fields

Rates Schema

Rate data defines rate plans:

FieldTypeDescription
IdStringUnique rate ID
ServiceIdStringAssociated service
GroupIdStringRate group reference
NameStringRate plan name
ShortNameStringAbbreviated name
IsActiveBooleanWhether rate is active
IsPublicBooleanWhether rate is publicly bookable
IsEnabledBooleanWhether rate is enabled

Schema Type: StructType with basic fields

Business Segments Schema

Business segment data defines market segments:

FieldTypeDescription
IdStringUnique segment ID
ServiceIdStringAssociated service
NameStringSegment name
IsActiveBooleanWhether segment is active

Schema Type: StructType with basic fields

Age Categories Schema

Age category data defines guest types:

FieldTypeDescription
IdStringUnique category ID
ServiceIdStringAssociated service
NameStringCategory name (e.g., “Adult”, “Child”)
ClassificationStringEither “Adult” or “Child”
OrderingIntegerDisplay order

Schema Type: StructType with basic fields

Companies Schema

Company data contains corporate account information:

FieldTypeDescription
IdStringUnique company ID
NameStringCompany name
NumberStringCompany number/code
IdentifierStringBusiness identifier
TaxIdentifierStringTax ID number
AdditionalTaxIdentifierStringSecondary tax ID
BillingCodeStringBilling reference code
AccountingCodeStringAccounting system code
AddressStructCompany address
InvoiceDueIntervalStringPayment terms
ContactStructContact person details
OptionsStructCompany options

Schema Type: StructType with nested address and contact

Products Schema

Product data defines orderable items:

FieldTypeDescription
IdStringUnique product ID
ServiceIdStringAssociated service
NameStringProduct name
ShortNameStringAbbreviated name
DescriptionStringProduct description
ExternalNameStringExternal system name
ChargingStringCharging mode
PostingStringPosting mode
PromotionsStructPromotion settings
ClassificationsStructProduct classifications
Classifications.FoodBooleanWhether classified as food
Classifications.BeverageBooleanWhether classified as beverage
Classifications.WellnessBooleanWhether classified as wellness
Classifications.CityTaxBooleanWhether classified as city tax
AccountingCategoryIdStringAccounting category reference
OptionsStructProduct options

Schema Type: StructType with nested classifications

Implementation Details

Core Methods

run()

Main execution method that orchestrates the ingest process:

  1. Iterates through each property configured in the job context
  2. Calls _process_property() for each property

_process_property(property_data: dict)

Processes a single property:

  1. Determines sync dates using _get_sync_dates()
  2. Falls back to sync_dates constructor parameter if no dates found
  3. Calls _process_sync_date() for each date

_get_sync_dates(property_id: str, table: str = "reservations")

Determines dates to sync:

  1. Calls get_next_sync_dates() from base class to get missing dates
  2. Falls back to self.sync_dates if no dates found
  3. Returns empty list if neither source has dates

_process_sync_date(property_id: str, sync_date: str)

Comprehensive data ingestion for a single property and date:

  1. Fetch Reservations: Calls _fetch_reservations() with 1-day buffer
  2. Extract IDs: Calls _extract_related_ids() to collect master data IDs
  3. Sync Master Data: Sequentially syncs services, resource categories, rates, business segments, age categories
  4. Save Reservations: Maps ServiceId to property_id and writes to catalog
  5. Sync Transactional Data: Syncs companies, customers, order items
  6. Extract Products: If order items contain products, syncs product master data

_fetch_reservations(property_id: str, sync_date: datetime)

Fetches reservations for a specific date:

  1. Calculates buffer window: sync_date - 1 day to sync_date + 1 day
  2. Calls MewsAPIClient.get_all_reservations() with UpdatedUtc filter
  3. Returns empty list if no reservations found

Extracts related entity IDs from reservations:

  1. Iterates through reservations
  2. Collects unique ServiceIds, RateIds, BusinessSegmentIds, CompanyIds, AccountIds
  3. Extracts AgeCategoryIds from PersonCounts array
  4. Returns dictionary of ID lists

_chunk_ids(ids: list, chunk_size: int = 1000)

Splits ID lists into chunks:

  • Chunk Size: 1000 IDs (Mews API limit)
  • Empty Handling: Returns empty list if no IDs
  • Usage: Used by all fetch methods to respect API limits

_fetch_entities(...)

Generic entity fetching with chunking and logging:

  1. Validates input IDs (returns empty list if none)
  2. Chunks IDs using _chunk_ids()
  3. Logs progress for each chunk
  4. Calls provided fetch function for each chunk
  5. Aggregates results across all chunks
  6. Returns combined entity list

_write_entities(...)

Generic entity writing to catalog:

  1. Creates Spark DataFrame from entity list
  2. Adds synced_timestamp and synced_date columns
  3. Handles property_id based on partition settings:
    • If property_column specified: Maps from entity column with fallback
    • If property_partition=True: Uses literal property_id
  4. Calls catalog.write_to_ingest() with appropriate partitioning

_sync_services(service_ids: list, property_id: str, sync_date: str)

Syncs service master data:

  1. Fetches services using chunked requests
  2. Writes to services table with property partition
  3. Creates and returns ServiceId → EnterpriseId mapping dictionary

_save_reservations(...)

Saves reservation data:

  1. Creates DataFrame from reservation list
  2. Renames VoucherId to VoucherCode
  3. Adds metadata columns
  4. Maps ServiceId to property_id using service_to_enterprise_map
  5. Writes to reservations table with property partition

_sync_resource_categories(service_ids: list, property_id: str, sync_date: str)

Syncs resource category (room type) master data:

  • Fetches by service_ids and enterprise_ids
  • Writes to resource_categories table with property partition

_sync_rates(rate_ids: list, property_id: str, sync_date: str)

Syncs rate plan master data:

  • Fetches by rate_ids and enterprise_ids
  • Writes to rates table without property partition

_sync_business_segments(business_segment_ids: list, property_id: str, sync_date: str)

Syncs business segment master data:

  • Fetches by business_segment_ids and enterprise_ids
  • Writes to business_segments table without property partition

_sync_age_categories(age_category_ids: list, property_id: str, sync_date: str)

Syncs age category master data:

  • Fetches by age_category_ids and enterprise_ids
  • Writes to age_categories table without property partition

_sync_companies(company_ids: list, property_id: str, sync_date: str)

Syncs company transactional data:

  • Fetches by company_ids and enterprise_ids
  • Writes to companies table without property partition

_sync_customers(account_ids: list, property_id: str, sync_date: str)

Syncs customer transactional data:

  • Fetches by customer_ids (account_ids) and enterprise_ids
  • Includes Customers and Addresses extent
  • Writes to customers table without property partition

_sync_order_items(reservation_ids: list, property_id: str, sync_date: str)

Syncs order item revenue data:

  1. Fetches by service_order_ids (reservation_ids)
  2. Writes to order_items table with property partition
  3. Returns raw order items list for product extraction

_extract_product_ids(order_items: list[dict])

Extracts product IDs from order items:

  1. Filters for items with Type=Product
  2. Extracts Data.Product.ProductId from each
  3. Returns set of unique product IDs

_sync_products(product_ids: list, property_id: str, sync_date: str)

Syncs product master data:

  • Fetches by product_ids and enterprise_ids
  • Writes to products table with property partition

_get_credentials()

Retrieves API credentials from AWS Secrets Manager:

  1. Creates boto3 session for eu-central-1 region
  2. Fetches secret using self._secret_name
  3. Parses JSON secret string
  4. Validates presence of client_token and access_token
  5. Returns credentials dictionary

Error Handling

  • AWS Secrets Manager Failures: If secret retrieval fails, exception propagates and stops ingestion
  • Missing Configuration: Raises ValueError if secret_name or base_url missing
  • API Request Failures: Handled by MewsAPIClient with exponential backoff retry logic
  • Empty Responses: Handled gracefully; empty DataFrames created without errors
  • Chunking: Each chunk is processed independently; failures in one chunk don’t block others
  • Entity Writes: Skip writing if entity list is empty (no DataFrame creation)

Type Safety

All data structures are validated against PySpark schemas during DataFrame creation. This ensures:

  • Type consistency for downstream processing
  • Early detection of schema changes in the API
  • Null-safe handling of optional fields
  • Preserved nested structure for complex objects (PersonCounts, Options, Amount, etc.)

Fields are primarily StringType to preserve raw API values. Type casting and business logic occur in the cleaning stage of the workflow.

Catalog Output

After successful ingestion, data is available in the ingest catalog:

Database: etl_gp_ingest_mews

Tables:

  • reservations - Complete reservation details, partitioned by synced_date and property_id
  • customers - Guest profile data, partitioned by synced_date
  • order_items - Revenue line items, partitioned by synced_date and property_id
  • services - Property services, partitioned by synced_date and property_id
  • resource_categories - Room types, partitioned by synced_date and property_id
  • rates - Rate plans, partitioned by synced_date
  • business_segments - Market segments, partitioned by synced_date
  • age_categories - Guest type classifications, partitioned by synced_date
  • companies - Corporate accounts, partitioned by synced_date
  • products - F&B and orderable items, partitioned by synced_date and property_id

API Rate Limiting Considerations

The Mews Connector API enforces rate limits:

  • General Endpoints: Subject to enterprise-level rate limiting
  • Pagination: Limited to 1000 records per request (controlled by Limitation.Count)
  • Retry Logic: Exponential backoff with 2-30 second delays, up to 5 attempts
  • Delay Between Pages: 100ms to avoid aggressive pagination

If rate limits are encountered:

  • The MewsAPIClient will automatically retry with exponential backoff
  • Consider reducing chunk size from 1000 to 500
  • Distribute sync dates across multiple job runs
  • Contact Mews support to increase rate limits if needed

Usage in Workflow

The Mews ingester is typically invoked as the first stage in the ETL workflow:

from etl_lib.job.JobContext import JobContext from etl_lib.pipeline.ingest.mews.MewsIngester import MewsIngester # Standard incremental ingest job_context = JobContext(source="mews", property_ids=["PROPERTY_ID"]) MewsIngester(job_context).run() # Backfill specific dates job_context = JobContext(source="mews", property_ids=["PROPERTY_ID"]) MewsIngester(job_context, sync_dates=["2025-01-01", "2025-01-02"]).run()

The ingest data is then processed by the cleaning pipeline to transform nested structures into flat tables suitable for analytics.

Last updated on