Crawl
The Crawler stage consolidates ingested data from S3 storage into optimized Iceberg tables in the AWS Glue Data Catalog. This stage bridges the gap between raw ingested files and the cleaning pipeline, providing a unified, queryable view of the data.
Purpose
After the ingest stage writes raw data to S3 in partitioned Parquet files, the Crawler:
- Discovers new data partitions in S3 storage
- Consolidates multiple date partitions into Iceberg tables
- Manages incremental vs. full reload strategies
- Evolves table schemas automatically when new fields are added
- Optimizes storage using Iceberg’s advanced features (time travel, schema evolution, partition evolution)
Architecture
Crawler Configuration
Crawlers are configured via YAML in the job context under crawler_config:
crawler_config:
- table: reservations
database: etl_gp_raw_athenaeum
incremental: true
property_partition: false
- table: revenues
database: etl_gp_raw_athenaeum
incremental: true
property_partition: falseConfiguration Options
| Field | Type | Description |
|---|---|---|
table | string | Name of the table to crawl (must match ingest output) |
database | string | Target Glue database (e.g., etl_gp_raw_athenaeum) |
incremental | boolean | true: append only new datesfalse: full reload (delete and replace) |
property_partition | boolean | true: separate processing per propertyfalse: single global table |
Incremental vs. Full Reload
Incremental Mode (incremental: true)
Use When: Data accumulates over time and historical records don’t change (e.g., daily snapshots, transaction logs)
Behavior:
- Queries existing table for already-loaded
synced_datevalues - Compares against available S3 partitions
- Only processes missing dates
- Appends new data to existing table
- Preserves all historical data
Example:
Existing dates in table: [2025-01-01, 2025-01-02, 2025-01-03]
Available in S3: [2025-01-01, 2025-01-02, 2025-01-03, 2025-01-04, 2025-01-05]
→ Crawler processes: [2025-01-04, 2025-01-05]Full Reload Mode (incremental: false)
Use When: Source data represents current state, not historical accumulation (e.g., master data, reference tables)
Behavior:
- Reads all available data from S3 (all partitions)
- Deletes existing data for the property/table
- Replaces with fresh complete dataset
- Ensures table reflects latest source state
Example: A products catalog where the entire catalog is reloaded daily
Implementation
BaseCrawler Class
All chain-specific crawlers extend BaseCrawler, which provides:
Core Methods
run()
Main orchestration method:
- Iterates through all tables in
crawler_config - Determines property partitioning strategy
- Processes each table/property combination
get_files_path(table_name: str, property_id: Optional[str]) -> S3Path
Abstract method - Must be implemented by each chain-specific crawler
Returns the S3 path where ingested data is stored:
# Example implementation
def get_files_path(self, table_name, property_id=None):
if property_id:
return S3Path(f"/etl-gp-raw/ingest/athenaeum/{table_name}/property_id={property_id}/")
return S3Path(f"/etl-gp-raw/ingest/athenaeum/{table_name}/")ensure_table(full_table_name: str, df: DataFrame) -> bool
Creates Iceberg table if it doesn’t exist:
- Uses Glue Catalog as metastore
- Partitions by
property_idandsynced_timestamp - Configures Parquet as default format
- Sets S3 location for data storage
evolve_schema(full_table_name: str, df: DataFrame)
Automatically adds new columns when source schema changes:
- Compares incoming DataFrame schema with existing table schema
- Uses
ALTER TABLE ADD COLUMNfor missing fields - Ensures backward compatibility without data loss
transform_raw_df(table_name: str, property_id: Optional[str], df: DataFrame)
Optional hook for custom transformations:
- Default implementation returns DataFrame unchanged
- Chain-specific crawlers can override for preprocessing
- Applied before writing to Iceberg table
Iceberg Table Features
The Crawler leverages Apache Iceberg’s advanced capabilities:
Partitioning Strategy
All crawled tables are partitioned by:
property_id: Enables property-specific filtering and managementsynced_timestamp: Tracks when data was synchronized
.partitionedBy(cof(SF.PROPERTY_ID), cof(SF.SYNCED_TIMESTAMP))Schema Evolution
Iceberg supports adding, renaming, and reordering columns without rewriting data. The Crawler automatically:
- Detects new columns in source data
- Adds them to existing tables
- Maintains compatibility with existing queries
Time Travel
Iceberg’s snapshot isolation enables querying historical table states:
-- Query table as it existed at specific timestamp
SELECT * FROM table TIMESTAMP AS OF '2025-01-15 10:00:00'
-- Query specific snapshot
SELECT * FROM table VERSION AS OF 12345Storage Optimization
- Parquet format: Columnar storage for efficient analytics
- S3 backend: Scalable, durable object storage
- Metadata management: Lightweight table operations without scanning all data
Property Partitioning
When property_partition: true, the Crawler processes each property independently:
for prop in self.properties:
property_id = prop["id"]
s3_path = self.get_files_path(table_name, property_id)
# Process property data separatelyBenefits:
- Parallel processing per property
- Property-specific error isolation
- Easier data retention policies per property
- Optimized queries with property filters
Trade-offs:
- More processing overhead for multi-property chains
- Slightly more complex configuration
Chain-Specific Crawlers
Each hotel chain implements its own crawler by extending BaseCrawler:
Athenaeum Crawler
from etl_lib.pipeline.crawlers.athenaeum.AthenaeumCrawler import AthenaeumCrawler
AthenaeumCrawler(job_context).run()Maps to ingest output structure:
s3://etl-gp-raw/ingest/athenaeum/reservations/s3://etl-gp-raw/ingest/athenaeum/revenues/
Queensway Crawler
from etl_lib.pipeline.crawlers.queensway.QueenswayCrawler import QueenswayCrawler
QueenswayCrawler(job_context).run()Cheval Collection Crawler
from etl_lib.pipeline.crawlers.chevalcollection.ChevalCrawler import ChevalCrawler
ChevalCrawler(job_context).run()Error Handling
The Crawler includes robust error handling:
Missing Tables
If a table doesn’t exist on first run, it’s automatically created with proper schema and partitioning.
Schema Mismatches
New fields are added automatically via schema evolution. Field type changes require manual intervention.
Missing Partitions
In incremental mode, if expected partitions are missing in S3, they’re simply skipped. The next run will pick them up.
S3 Access Issues
Connection failures or permission errors will halt the crawler. Check IAM roles and S3 bucket policies.
Output
After crawling completes, data is available in the Glue Data Catalog:
Database Pattern: etl_gp_raw_{chain_id}
Example Tables:
glue_catalog.etl_gp_raw_athenaeum.reservationsglue_catalog.etl_gp_raw_athenaeum.revenues
Query from Spark:
reservations = spark.table("glue_catalog.etl_gp_raw_athenaeum.reservations")
reservations.filter("synced_date >= '2025-01-01'").show()Query from Athena:
SELECT * FROM etl_gp_raw_athenaeum.reservations
WHERE synced_date >= DATE '2025-01-01'
AND property_id = 'ATH'Usage in Workflow
The Crawler runs after ingestion, before cleaning:
from etl_lib.job.JobContext import JobContext
from etl_lib.pipeline.ingest.athenaeum.AthenaeumIngester import AthenaeumIngester
from etl_lib.pipeline.crawlers.athenaeum.AthenaeumCrawler import AthenaeumCrawler
job_context = JobContext(chain_id="athenaeum", partitions=16)
# Step 1: Ingest raw data to S3
AthenaeumIngester(job_context).run()
# Step 2: Crawl data into Iceberg tables
AthenaeumCrawler(job_context).run()
# Step 3: Continue to cleaning stageNext Steps
After the crawl stage completes:
- Cleaning - Validate, transform, and standardize the crawled data
- Processing - Apply business logic and derive analytics