Athenaeum
The Athenaeum ingest process retrieves daily booking and revenue data from a secure SFTP server and loads it into the raw data catalog.
Overview
The AthenaeumIngester class handles the extraction of two main data types from the Athenaeum property management system:
- Reservations: Daily booking information including guest details, room assignments, and booking status
- Revenues: Transaction-level financial data including charges, payments, and revenue classifications
Data Source Configuration
The ingester connects to an SFTP server using credentials configured in the job context:
AthenaeumIngester(job_context).run()Connection Details
- Protocol: Secure FTP (SFTP)
- Property ID:
ATH - Data Format: Pipe-delimited CSV files (
|) - File Naming Convention:
- Bookings:
ATH/ATHdailyBookings/ATH_dailyBookings_YYYYMMDD.csv - Revenues:
ATH/ATHdailyRevenue/ATH_dailyRevenue_YYYYMMDD.csv
- Bookings:
Ingest Process Flow
Incremental Loading Strategy
The ingester uses an incremental sync approach to avoid reprocessing data:
- Check Last Sync Date: Query the raw catalog for the most recent
synced_datefor each table - Calculate Missing Dates: Determine all dates between last sync and today
- Fetch Daily Files: Download and process one file per day per table
- Record Sync Date: Each batch is tagged with its
synced_datefor future incremental runs
If no previous sync exists (first run), the ingester can accept a list of specific dates to process via the sync_dates parameter.
Data Schemas
Reservations Schema (48 fields)
The booking data includes comprehensive guest and reservation information:
| Field Category | Fields | Description |
|---|---|---|
| Guest Identity | PROFILEID, NATIONALITY, VIPLEVEL | Guest profile and demographic data |
| Contact Info | ADDRESS, CITY, STATE, POSTALCODE, COUNTRY, PHONETYPE, PHONENUMBER, MOBILECOUNTRYCODE, MOBILENUMBER, EMAILTYPE, EMAILADD | Complete contact details |
| Booking Details | CONFIRMATIONNUMBER, THIRDPARTYCONF, STATUSCODE, RESVDATE, CREATEDON, UPDATEDON | Reservation identification and status |
| Stay Information | ARRIVALDATE, ARRIVALTIME, DEPARTUREDATE, DEPARTURETIME, ADULTS, CHILDREN | Check-in/out details and guest count |
| Room Assignment | ROOM, ROOMTYPE, RATEPLAN, ROOMTYPCHG, ROOMTYPCHGREASON, DONOTMOVE, DONOTMOVEREASON, ISROOMUPSOLD | Room allocation and upgrades |
| Business Data | COMPANYNAME, COMPANYID, IATANUMBER, TANAME1, TAID1, MARKETSEGMENT | Corporate and travel agent information |
| Group Bookings | GROUPNAME, GROUPNUMBER | Group reservation tracking |
| Cancellations | CANCELDATE, CANCELREASON | Cancellation details if applicable |
| Preferences | HASPREFERENCES | Guest preference indicators |
| Audit Trail | CREATEDBY, UPDATEDBY, RSL_CODE | System tracking fields |
Revenues Schema (42 fields)
The revenue data captures detailed transaction information:
| Field Category | Fields | Description |
|---|---|---|
| Transaction Core | RVL_DATE, RVL_RECORDTYPE, RVL_LINE, RVL_PAYTYPE, RVL_TRANSACTIONTYPE | Transaction identification and type |
| Financial Amounts | RVL_QUANTITY, RVL_NETAMOUNT, RVL_TAXAMOUNT, RVL_AMOUNT | Monetary values and quantities |
| Revenue Classification | RVL_REVENUETYPE, RVL_TRANSACTIONCODE, RVL_TRANSACTIONDESC, RVL_TYPEOFREVENUE | Revenue categorization |
| Account Info | RVL_ACCOUNT, RVL_ACCOUNTID, RVL_ACCOUNTTYPE, RVL_ACCOUNTNAME | Account and folio details |
| Reservation Link | RVL_RESERVATIONID, RVL_RESERVATIONSTAYID, RVL_CONFIRMATIONNUMBER, RVL_REFNUM | Links to booking records |
| Stay Context | RVL_STATUSCODE, RVL_ARRIVAL, RVL_DEPARTURE, RVL_ROOMNUMBER, RVL_ROOMTYPE | Associated stay information |
| Market Data | RVL_MARKETSEGMENT, RVL_REASONFORSTAY, RVL_SOURCECODE, RVL_SECONDARYSOURCECODE, RVL_SOURCE, RVL_RATECODE | Booking channel and market segment |
| Business Partners | RVL_TAID, RVL_TANAME, RVL_ORGID, RVL_ORGNAME, RVL_CRSNUMBER | Travel agents and companies |
| Audit Timestamps | RVL_CREATEDON, RVL_UPDATEDON, RVL_CANCELLEDON | Transaction lifecycle dates |
| Cancellations | RVL_CANCELLATIONCODE | Cancellation tracking |
| Property | RVL_PROPERTY | Property identifier |
Implementation Details
Core Methods
run()
Main execution method that orchestrates the ingest process:
- Determines sync dates for reservations table
- Ingests booking data for each date
- Determines sync dates for revenues table
- Ingests revenue data for each date
_get_raw_bookings(date: str)
Ingests booking data for a specific date:
- Constructs the SFTP file path using date format
YYYYMMDD - Downloads and parses the CSV with pipe delimiter
- Applies the
RAW_BOOKING_SCHEMAfor type safety - Writes to
etl_gp_raw_athenaeum.reservationscatalog table withsynced_datepartition
_get_raw_revenues(date: str)
Ingests revenue data for a specific date:
- Constructs the SFTP file path using date format
YYYYMMDD - Downloads and parses the CSV with pipe delimiter
- Applies the
RAW_REVENUE_SCHEMAfor type safety - Writes to
etl_gp_raw_athenaeum.revenuescatalog table withsynced_datepartition
Error Handling
The ingester relies on the underlying SecureFTP utility for connection management and file retrieval. If a daily file is missing or malformed:
- The SFTP client will raise an exception
- The ingester will stop processing subsequent dates
- Manual intervention may be required to investigate missing files
Type Safety
All fields are initially loaded as StringType to preserve raw data integrity. Type casting and validation occur in the cleaning stage of the workflow.
Catalog Output
After successful ingestion, data is available in the raw catalog:
Database: etl_gp_raw_athenaeum
Tables:
reservations- Daily booking snapshots with partitioning bysynced_dateandproperty_idrevenues- Daily transaction records with partitioning bysynced_dateandproperty_id
Both tables are partitioned to enable efficient incremental processing and data retention management.
Usage in Workflow
The Athenaeum ingest process is invoked as the first step in the ETL workflow:
from etl_lib.job.JobContext import JobContext
from etl_lib.pipeline.ingest.athenaeum.AthenaeumIngester import AthenaeumIngester
job_context = JobContext(chain_id="athenaeum", partitions=16)
AthenaeumIngester(job_context).run()After ingestion completes, the raw data flows to:
- Crawling - Consolidates S3 files into Iceberg tables in the Glue Data Catalog
- Cleaning - Data validation and transformation
- Processing - Business logic and analytics
Next Steps
After the ingest stage completes:
- Crawler - Consolidates ingested S3 partitions into queryable Iceberg tables
- Athenaeum Cleaner - Validates and transforms raw data into standardized format