Skip to Content
DevelopmentReferenceTasksProcessingGuestMatchingTask — Developer Reference

GuestMatchingTask — Developer Reference

This developer reference contains code examples, configuration, and implementation details for the GuestMatchingTask.

Example Usage

Standard Usage

from etl_lib.tasks.processing.GuestMatchingTask import GuestMatchingTask task = GuestMatchingTask( job_context=job_context, write_to_catalog=True ) task.run()

Force Retraining

# Delete existing model to force retraining from etl_lib.data.enums.S3Paths import S3Paths model_path = S3Paths.PROCESSED_SPLINK_MODELS.value / f"{chain_id}.json" if model_path.exists(): model_path.unlink() # Now run task - will retrain task = GuestMatchingTask(job_context=job_context) task.run() ## Incremental Mode To use incremental matching, rely on the job `is_incremental` flag. The task will: - Train the model if needed (unless model exists and `force_retrain=False`) - If `ProcessedGuestModel` already exists, it will match only new guests using `link_only` and write new/added guests to `ProcessedAddedGuestModel`. - If no existing processed guests are present, the task falls back to a full dedup and writes both `ProcessedGuestModel` and `ProcessedAddedGuestModel`. ## Parameters - `job_context`: Runtime context that provides Spark session, chain_id and IO methods. - `write_to_catalog` (bool): If `True`, writes outputs to the catalog. - `is_incremental` (Optional[bool]): Run the task in incremental mode when `True`. - `model_path` (Optional[os.PathLike]): S3 path to save/load the trained Splink model. Defaults to `S3Paths.PROCESSED_SPLINK_MODELS.value / f"{chain_id}.json"`. - `force_retrain` (bool): Force retrain when `True` even if a model exists at `model_path`.

As Part of ProcessingTask

from etl_lib.tasks.processing.ProcessingTask import ProcessingTask # Runs as the 3rd subtask task = ProcessingTask(job_context=job_context) task.run()

Skip for Testing

# Skip matching for faster testing task = ProcessingTask( job_context=job_context, skip_subtasks=["GuestMatchingTask"] ) task.run()

Implementation Details

Column Filtering

def _columns_not_empty(df: DataFrame): """Return columns with at least one non-null value.""" existing_cols = [col for col in _COLUMNS if col in df.columns] agg_exprs = [ F.count(F.when(F.col(c).isNotNull(), c)).alias(c) for c in existing_cols ] counts = df.agg(*agg_exprs).collect()[0].asDict() return [col for col, cnt in counts.items() if cnt > 0] ### Parameters and Process Overview - The task reads `CleanGuestModel` as input and writes `ProcessedGuestMatchModel` (pre-merge matches) and `ProcessedGuestModel` (deduplicated guests). In incremental mode, it also writes `ProcessedAddedGuestModel`. - If `model_path` is missing or `force_retrain` is set, the task trains a Splink model and saves it as JSON to `model_path`. - The Splink `Linker` is created with the saved model settings and the Spark `db_api` optimized for lineage breaking (`break_lineage_method="parquet"`). ### Splink Settings The Splink settings used by the task are dynamically generated based on available columns in the dataset. Key settings include: - `link_type`: `dedupe_only` for full dedupe, changed to `link_only` during incremental linking between new and existing guests. - `unique_id_column_name`: `guest_id`. - `comparisons`: A combination of `EmailComparison`, `NameComparison` (lower-cased), and `ExactMatch` for other columns such as `birth_date`, `gender`, `nationality`, `passport_number`, and address fields. - `blocking_rules_to_generate_predictions`: Blocking rules built using `block_on` to create candidate pair lists (examples: `email`, `phone_number`, `email+birth_date`, `first_name+last_name+birth_date`, etc.). - `term_frequency_adjustments`: Enabled for fields like `nationality` and `address_country`. - `em_convergence`: 0.01 (controls stop condition for EM training). ### Comparisons and Blocking Rules - Names are compared using Splink `NameComparison` and are lower-cased via `ColumnExpression(...).lower()`. - Email, phone, passport are compared either using `EmailComparison` or `ExactMatch` on lower-cased values. - Address fields are added as `ExactMatch` comparisons and `address_country` uses term frequency adjustments. - Blocking rules are added for email, phone, combinations of first/last name and address or birth_date, and passport. These are chosen to reduce the number of candidate pairs without missing likely matches.

Model Configuration

settings = { "link_type": "dedupe_only", "unique_id_column_name": "guest_id", "comparisons": [...], # Built dynamically "blocking_rules_to_generate_predictions": [...], # Built dynamically "retain_matching_columns": False, "retain_intermediate_calculation_columns": False, "em_convergence": 0.01, } ### Matching Threshold - The default match threshold used by `Linker.inference.predict()` is `0.95`. This controls the severity of match detection; higher values reduce false positives but may miss true matches. ### Training Workflow - The task estimates u probabilities using `linker.training.estimate_u_using_random_sampling(max_pairs=5e5)` to seed EM training efficiently. - EM steps are run for `first_name`, `last_name`, and (conditionally) for `email`, `phone_number`, and `passport_number` blocking rules. The EM estimation is called with `estimate_without_term_frequencies=False` to use term frequencies when available. - After training, the model is serialized using `linker.misc.save_model_to_json()` and written to the configured `model_path`. ### Full Deduplication Flow 1. Create `Linker` with `link_type='dedupe_only'` and `db_api` from the Spark session. 2. Run `linker.inference.predict(threshold_match_probability=0.95)` to compute link probabilities. 3. Cluster predictions using `linker.clustering.cluster_pairwise_predictions_at_threshold(results, threshold_match_probability=0.95)` to output (cluster_id, guest_id) pairs. 4. Join clusters back to guest data and assign `guest_cluster_id` (coalesce cluster_id and guest_id for singletons). 5. Write the pre-merge matching results to `ProcessedGuestMatchModel`. 6. Merge profiles within clusters using completeness and recency heuristics, writing final results to `ProcessedGuestModel`. ### Incremental Linking Flow 1. If `ProcessedGuestModel` exists, switch model settings to `link_only` and create a `Linker` using both `new_guests` and `existing_guests` as inputs. 2. Predict links using `linker.inference.predict(threshold_match_probability=0.95)`. 3. Build a mapping of new guest_id -> guest_cluster_id using matched existing guest clusters. Unmatched new guests are assigned their own `guest_id` as `guest_cluster_id`. 4. Append new guests' pre-merge matches to the existing `ProcessedGuestMatchModel`. 5. Merge profiles for the new guests only and write them to `ProcessedAddedGuestModel`. 6. Later, `_merge_incremental()` combines `ProcessedAddedGuestModel` with `ProcessedGuestModel` using `unionByName` and `dropDuplicates(['guest_id'])`. ### Merging Logic - Completeness score: The merging mechanism computes a completeness score by counting non-null values across critical columns and calculates recency using `created_timestamp`. - The best profile per cluster is chosen by ordering by completeness then recency. Non-null columns are coalesced using `first(..., ignorenulls=True)` across the window partitioned by cluster. ### Outputs - `ProcessedGuestMatchModel` — Pre-merge matching results (cluster assignments per guest). - `ProcessedGuestModel` — Final deduplicated guest records that contain a `guest_cluster_id`. - `ProcessedAddedGuestModel` (incremental only) — Newly added/merged guest records, used for incremental merging. ### Performance and Tuning - The task persists intermediate data using `StorageLevel.MEMORY_AND_DISK` and uses eager checkpointing to break long Spark lineage chains. - The `db_api` is configured with `break_lineage_method='parquet'` to write to parquet when lineage needs to be broken. - The `_get_memory_size()` helper computes 80% of the physical memory and returns it as a friendly string for configuration. ### Error Handling and Edge Cases - If the input `guest_df` is empty in full mode, the task exits early. In incremental mode, an empty dataset raises a `ValueError` to surface configuration issues. - If `ProcessedGuestModel` is missing during incremental mode, the task performs a full deduplication to bootstrap the system.

Model Training / Persistence

# Model saved at s3://processed-data/splink-models/{chain_id}.json ### Outputs - `ProcessedGuestMatchModel` — Pre-merge matching results (cluster assignments per guest). - `ProcessedGuestModel` — Final deduplicated guest records. - `ProcessedAddedGuestModel` — Incremental output with newly added or merged guests.

Back to process documentation

Back to process documentation: /processes/tasks/processing/guest-matching-task

Last updated on