Skip to Content

Crawl

The Crawler stage consolidates ingested data from S3 storage into optimized Iceberg tables in the AWS Glue Data Catalog. This stage bridges the gap between raw ingested files and the cleaning pipeline, providing a unified, queryable view of the data.

Purpose

After the ingest stage writes raw data to S3 in partitioned Parquet files, the Crawler:

  1. Discovers new data partitions in S3 storage
  2. Consolidates multiple date partitions into Iceberg tables
  3. Manages incremental vs. full reload strategies
  4. Evolves table schemas automatically when new fields are added
  5. Optimizes storage using Iceberg’s advanced features (time travel, schema evolution, partition evolution)

Architecture

Crawler Configuration

Crawlers are configured via YAML in the job context under crawler_config:

crawler_config: - table: reservations database: etl_gp_raw_athenaeum incremental: true property_partition: false - table: revenues database: etl_gp_raw_athenaeum incremental: true property_partition: false

Configuration Options

FieldTypeDescription
tablestringName of the table to crawl (must match ingest output)
databasestringTarget Glue database (e.g., etl_gp_raw_athenaeum)
incrementalbooleantrue: append only new dates
false: full reload (delete and replace)
property_partitionbooleantrue: separate processing per property
false: single global table

Incremental vs. Full Reload

Incremental Mode (incremental: true)

Use When: Data accumulates over time and historical records don’t change (e.g., daily snapshots, transaction logs)

Behavior:

  1. Queries existing table for already-loaded synced_date values
  2. Compares against available S3 partitions
  3. Only processes missing dates
  4. Appends new data to existing table
  5. Preserves all historical data

Example:

Existing dates in table: [2025-01-01, 2025-01-02, 2025-01-03] Available in S3: [2025-01-01, 2025-01-02, 2025-01-03, 2025-01-04, 2025-01-05] → Crawler processes: [2025-01-04, 2025-01-05]

Full Reload Mode (incremental: false)

Use When: Source data represents current state, not historical accumulation (e.g., master data, reference tables)

Behavior:

  1. Reads all available data from S3 (all partitions)
  2. Deletes existing data for the property/table
  3. Replaces with fresh complete dataset
  4. Ensures table reflects latest source state

Example: A products catalog where the entire catalog is reloaded daily

Implementation

BaseCrawler Class

All chain-specific crawlers extend BaseCrawler, which provides:

Core Methods

run()

Main orchestration method:

  • Iterates through all tables in crawler_config
  • Determines property partitioning strategy
  • Processes each table/property combination
get_files_path(table_name: str, property_id: Optional[str]) -> S3Path

Abstract method - Must be implemented by each chain-specific crawler

Returns the S3 path where ingested data is stored:

# Example implementation def get_files_path(self, table_name, property_id=None): if property_id: return S3Path(f"/etl-gp-raw/ingest/athenaeum/{table_name}/property_id={property_id}/") return S3Path(f"/etl-gp-raw/ingest/athenaeum/{table_name}/")
ensure_table(full_table_name: str, df: DataFrame) -> bool

Creates Iceberg table if it doesn’t exist:

  • Uses Glue Catalog as metastore
  • Partitions by property_id and synced_timestamp
  • Configures Parquet as default format
  • Sets S3 location for data storage
evolve_schema(full_table_name: str, df: DataFrame)

Automatically adds new columns when source schema changes:

  • Compares incoming DataFrame schema with existing table schema
  • Uses ALTER TABLE ADD COLUMN for missing fields
  • Ensures backward compatibility without data loss
transform_raw_df(table_name: str, property_id: Optional[str], df: DataFrame)

Optional hook for custom transformations:

  • Default implementation returns DataFrame unchanged
  • Chain-specific crawlers can override for preprocessing
  • Applied before writing to Iceberg table

Iceberg Table Features

The Crawler leverages Apache Iceberg’s advanced capabilities:

Partitioning Strategy

All crawled tables are partitioned by:

  • property_id: Enables property-specific filtering and management
  • synced_timestamp: Tracks when data was synchronized
.partitionedBy(cof(SF.PROPERTY_ID), cof(SF.SYNCED_TIMESTAMP))

Schema Evolution

Iceberg supports adding, renaming, and reordering columns without rewriting data. The Crawler automatically:

  • Detects new columns in source data
  • Adds them to existing tables
  • Maintains compatibility with existing queries

Time Travel

Iceberg’s snapshot isolation enables querying historical table states:

-- Query table as it existed at specific timestamp SELECT * FROM table TIMESTAMP AS OF '2025-01-15 10:00:00' -- Query specific snapshot SELECT * FROM table VERSION AS OF 12345

Storage Optimization

  • Parquet format: Columnar storage for efficient analytics
  • S3 backend: Scalable, durable object storage
  • Metadata management: Lightweight table operations without scanning all data

Property Partitioning

When property_partition: true, the Crawler processes each property independently:

for prop in self.properties: property_id = prop["id"] s3_path = self.get_files_path(table_name, property_id) # Process property data separately

Benefits:

  • Parallel processing per property
  • Property-specific error isolation
  • Easier data retention policies per property
  • Optimized queries with property filters

Trade-offs:

  • More processing overhead for multi-property chains
  • Slightly more complex configuration

Chain-Specific Crawlers

Each hotel chain implements its own crawler by extending BaseCrawler:

Athenaeum Crawler

from etl_lib.pipeline.crawlers.athenaeum.AthenaeumCrawler import AthenaeumCrawler AthenaeumCrawler(job_context).run()

Maps to ingest output structure:

  • s3://etl-gp-raw/ingest/athenaeum/reservations/
  • s3://etl-gp-raw/ingest/athenaeum/revenues/

Queensway Crawler

from etl_lib.pipeline.crawlers.queensway.QueenswayCrawler import QueenswayCrawler QueenswayCrawler(job_context).run()

Cheval Collection Crawler

from etl_lib.pipeline.crawlers.chevalcollection.ChevalCrawler import ChevalCrawler ChevalCrawler(job_context).run()

Error Handling

The Crawler includes robust error handling:

Missing Tables

If a table doesn’t exist on first run, it’s automatically created with proper schema and partitioning.

Schema Mismatches

New fields are added automatically via schema evolution. Field type changes require manual intervention.

Missing Partitions

In incremental mode, if expected partitions are missing in S3, they’re simply skipped. The next run will pick them up.

S3 Access Issues

Connection failures or permission errors will halt the crawler. Check IAM roles and S3 bucket policies.

Output

After crawling completes, data is available in the Glue Data Catalog:

Database Pattern: etl_gp_raw_{chain_id}

Example Tables:

  • glue_catalog.etl_gp_raw_athenaeum.reservations
  • glue_catalog.etl_gp_raw_athenaeum.revenues

Query from Spark:

reservations = spark.table("glue_catalog.etl_gp_raw_athenaeum.reservations") reservations.filter("synced_date >= '2025-01-01'").show()

Query from Athena:

SELECT * FROM etl_gp_raw_athenaeum.reservations WHERE synced_date >= DATE '2025-01-01' AND property_id = 'ATH'

Usage in Workflow

The Crawler runs after ingestion, before cleaning:

from etl_lib.job.JobContext import JobContext from etl_lib.pipeline.ingest.athenaeum.AthenaeumIngester import AthenaeumIngester from etl_lib.pipeline.crawlers.athenaeum.AthenaeumCrawler import AthenaeumCrawler job_context = JobContext(chain_id="athenaeum", partitions=16) # Step 1: Ingest raw data to S3 AthenaeumIngester(job_context).run() # Step 2: Crawl data into Iceberg tables AthenaeumCrawler(job_context).run() # Step 3: Continue to cleaning stage

Next Steps

After the crawl stage completes:

  • Cleaning - Validate, transform, and standardize the crawled data
  • Processing - Apply business logic and derive analytics
Last updated on