GuestMatchingTask — Developer Reference
This developer reference contains code examples, configuration, and implementation details for the GuestMatchingTask.
Example Usage
Standard Usage
from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask
task = GuestMatchingTask(
job_context=job_context,
write_to_catalog=True
)
task.run()Force Retraining
# Delete existing model to force retraining
from etl_lib.data.enums.S3Paths import S3Paths
model_path = S3Paths.PROCESSED_SPLINK_MODELS.value / f"{chain_id}.json"
if model_path.exists():
model_path.unlink()
# Now run task - will retrain
task = GuestMatchingTask(job_context=job_context)
task.run()
## Incremental Mode
To use incremental matching, rely on the job `is_incremental` flag. The task will:
- Train the model if needed (unless model exists and `force_retrain=False`)
- If `ProcessedGuestModel` already exists, it will match only new guests using `link_only` and write new/added guests to `ProcessedAddedGuestModel`.
- If no existing processed guests are present, the task falls back to a full dedup and writes both `ProcessedGuestModel` and `ProcessedAddedGuestModel`.
## Parameters
- `job_context`: Runtime context that provides Spark session, chain_id and IO methods.
- `write_to_catalog` (bool): If `True`, writes outputs to the catalog.
- `is_incremental` (Optional[bool]): Run the task in incremental mode when `True`.
- `model_path` (Optional[os.PathLike]): S3 path to save/load the trained Splink model. Defaults to `S3Paths.PROCESSED_SPLINK_MODELS.value / f"{chain_id}.json"`.
- `force_retrain` (bool): Force retrain when `True` even if a model exists at `model_path`.As Part of ProcessingTask
from etl_lib.tasks.processing.ProcessingTask import ProcessingTask
# Runs as the 3rd subtask
task = ProcessingTask(job_context=job_context)
task.run()Skip for Testing
# Skip matching for faster testing
task = ProcessingTask(
job_context=job_context,
skip_subtasks=["GuestMatchingTask"]
)
task.run()Implementation Details
Column Filtering
def _columns_not_empty(df: DataFrame):
"""Return columns with at least one non-null value."""
existing_cols = [col for col in _COLUMNS if col in df.columns]
agg_exprs = [
F.count(F.when(F.col(c).isNotNull(), c)).alias(c)
for c in existing_cols
]
counts = df.agg(*agg_exprs).collect()[0].asDict()
return [col for col, cnt in counts.items() if cnt > 0]
### Parameters and Process Overview
- The task reads `CleanGuestModel` as input and writes `ProcessedGuestMatchModel` (pre-merge matches) and `ProcessedGuestModel` (deduplicated guests). In incremental mode, it also writes `ProcessedAddedGuestModel`.
- If `model_path` is missing or `force_retrain` is set, the task trains a Splink model and saves it as JSON to `model_path`.
- The Splink `Linker` is created with the saved model settings and the Spark `db_api` optimized for lineage breaking (`break_lineage_method="parquet"`).
### Splink Settings
The Splink settings used by the task are dynamically generated based on available columns in the dataset. Key settings include:
- `link_type`: `dedupe_only` for full dedupe, changed to `link_only` during incremental linking between new and existing guests.
- `unique_id_column_name`: `guest_id`.
- `comparisons`: A combination of `EmailComparison`, `NameComparison` (lower-cased), and `ExactMatch` for other columns such as `birth_date`, `gender`, `nationality`, `passport_number`, and address fields.
- `blocking_rules_to_generate_predictions`: Blocking rules built using `block_on` to create candidate pair lists (examples: `email`, `phone_number`, `email+birth_date`, `first_name+last_name+birth_date`, etc.).
- `term_frequency_adjustments`: Enabled for fields like `nationality` and `address_country`.
- `em_convergence`: 0.01 (controls stop condition for EM training).
### Comparisons and Blocking Rules
- Names are compared using Splink `NameComparison` and are lower-cased via `ColumnExpression(...).lower()`.
- Email, phone, passport are compared either using `EmailComparison` or `ExactMatch` on lower-cased values.
- Address fields are added as `ExactMatch` comparisons and `address_country` uses term frequency adjustments.
- Blocking rules are added for email, phone, combinations of first/last name and address or birth_date, and passport. These are chosen to reduce the number of candidate pairs without missing likely matches.Model Configuration
settings = {
"link_type": "dedupe_only",
"unique_id_column_name": "guest_id",
"comparisons": [...], # Built dynamically
"blocking_rules_to_generate_predictions": [...], # Built dynamically
"retain_matching_columns": False,
"retain_intermediate_calculation_columns": False,
"em_convergence": 0.01,
}
### Matching Threshold
- The default match threshold used by `Linker.inference.predict()` is `0.95`. This controls the severity of match detection; higher values reduce false positives but may miss true matches.
### Training Workflow
- The task estimates u probabilities using `linker.training.estimate_u_using_random_sampling(max_pairs=5e5)` to seed EM training efficiently.
- EM steps are run for `first_name`, `last_name`, and (conditionally) for `email`, `phone_number`, and `passport_number` blocking rules. The EM estimation is called with `estimate_without_term_frequencies=False` to use term frequencies when available.
- After training, the model is serialized using `linker.misc.save_model_to_json()` and written to the configured `model_path`.
### Full Deduplication Flow
1. Create `Linker` with `link_type='dedupe_only'` and `db_api` from the Spark session.
2. Run `linker.inference.predict(threshold_match_probability=0.95)` to compute link probabilities.
3. Cluster predictions using `linker.clustering.cluster_pairwise_predictions_at_threshold(results, threshold_match_probability=0.95)` to output (cluster_id, guest_id) pairs.
4. Join clusters back to guest data and assign `guest_cluster_id` (coalesce cluster_id and guest_id for singletons).
5. Write the pre-merge matching results to `ProcessedGuestMatchModel`.
6. Merge profiles within clusters using completeness and recency heuristics, writing final results to `ProcessedGuestModel`.
### Incremental Linking Flow
1. If `ProcessedGuestModel` exists, switch model settings to `link_only` and create a `Linker` using both `new_guests` and `existing_guests` as inputs.
2. Predict links using `linker.inference.predict(threshold_match_probability=0.95)`.
3. Build a mapping of new guest_id -> guest_cluster_id using matched existing guest clusters. Unmatched new guests are assigned their own `guest_id` as `guest_cluster_id`.
4. Append new guests' pre-merge matches to the existing `ProcessedGuestMatchModel`.
5. Merge profiles for the new guests only and write them to `ProcessedAddedGuestModel`.
6. Later, `_merge_incremental()` combines `ProcessedAddedGuestModel` with `ProcessedGuestModel` using `unionByName` and `dropDuplicates(['guest_id'])`.
### Merging Logic
- Completeness score: The merging mechanism computes a completeness score by counting non-null values across critical columns and calculates recency using `created_timestamp`.
- The best profile per cluster is chosen by ordering by completeness then recency. Non-null columns are coalesced using `first(..., ignorenulls=True)` across the window partitioned by cluster.
### Outputs
- `ProcessedGuestMatchModel` — Pre-merge matching results (cluster assignments per guest).
- `ProcessedGuestModel` — Final deduplicated guest records that contain a `guest_cluster_id`.
- `ProcessedAddedGuestModel` (incremental only) — Newly added/merged guest records, used for incremental merging.
### Performance and Tuning
- The task persists intermediate data using `StorageLevel.MEMORY_AND_DISK` and uses eager checkpointing to break long Spark lineage chains.
- The `db_api` is configured with `break_lineage_method='parquet'` to write to parquet when lineage needs to be broken.
- The `_get_memory_size()` helper computes 80% of the physical memory and returns it as a friendly string for configuration.
### Error Handling and Edge Cases
- If the input `guest_df` is empty in full mode, the task exits early. In incremental mode, an empty dataset raises a `ValueError` to surface configuration issues.
- If `ProcessedGuestModel` is missing during incremental mode, the task performs a full deduplication to bootstrap the system.Model Training / Persistence
# Model saved at
s3://processed-data/splink-models/{chain_id}.json
### Outputs
- `ProcessedGuestMatchModel` — Pre-merge matching results (cluster assignments per guest).
- `ProcessedGuestModel` — Final deduplicated guest records.
- `ProcessedAddedGuestModel` — Incremental output with newly added or merged guests.Back to process documentation
Back to process documentation: /processes/tasks/processing/guest-matching-task
Last updated on