Skip to Content
DevelopmentReferenceTasksProcessingProcessingTask — Developer Reference

ProcessingTask — Developer Reference

This page contains code-related information, examples, and implementation details for the ProcessingTask.

Constructor Parameters

ParameterTypeDescription
nameOptional[str]Custom name for the task
job_contextJobContextContext with Spark session and configuration
subtasksOptional[List[Task]]Custom list of subtasks (uses default if None)
skip_subtasksOptional[List[str]]Names of subtasks to skip during execution
write_to_catalogboolWhether to write outputs to catalog (default: True)
is_incrementalOptional[bool]Whether to run in incremental mode; if omitted uses job_context.is_incremental

Example Usage

Basic Usage

from etl_lib.tasks.processing.ProcessingTask import ProcessingTask # Create and run with all default subtasks task = ProcessingTask( job_context=job_context, write_to_catalog=True ) task.run()

Skip Specific Subtasks

# Skip guest matching (useful for testing or incremental updates) task = ProcessingTask( job_context=job_context, skip_subtasks=["GuestMatchingTask"] ) task.run()

Subtask Type Hints

PROCESS_SUBTASK = Union[ Literal["ReservationMetricsTask"], Literal["RoomMetricsTask"], Literal["GuestMatchingTask"], Literal["GuestMatchingCheckTask"], Literal["GuestLoyaltyTask"], Literal["ReplaceValuesTask"], ] # Type-safe skip task = ProcessingTask( job_context=job_context, skip_subtasks=["GuestMatchingTask", "GuestLoyaltyTask"] )

Implementation Details

Subtask Coordination

def _run_subtasks(self): super()._run_subtasks() # Collect results from subtasks # Writes happen only if DataFrames have data. For incremental runs, only ProcessedAdded* models are written if not empty. for model in (ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel): try: model_instance = self.outputs.get(model) if model_instance and model_instance.df is not None: self.write_to_output(model) except Exception: pass ### Subtask Initialization - The default subtasks are created with `write_to_catalog=False` (so subtasks do not write directly to the catalog). The parent `ProcessingTask` assembles subtask outputs and controls the final write to the catalog (based on `write_to_catalog` and whether the DataFrames are non-empty).

Shared State

All subtasks share access to:

  • Input models (CleanGuestModel, CleanReservationModel, CleanRoomModel)
  • Output models (ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel)
  • Incremental outputs: ProcessedAddedGuestModel, ProcessedAddedReservationModel, ProcessedAddedRoomModel
  • Job context (Spark session, configuration)

Common Patterns and Examples

Testing Without Guest Matching

# Guest matching is slow - skip for faster testing task = ProcessingTask( job_context=job_context, skip_subtasks=["GuestMatchingTask"] )

Incremental Processing

# Skip tasks that don't need re-running task = ProcessingTask( job_context=job_context, skip_subtasks=[ "GuestMatchingTask", # Already matched "GuestLoyaltyTask", # Loyalty computed ] )

Chain-Specific Processing

# Different chains may skip different subtasks if chain_id == "athenaeum": skip = ["ReplaceValuesTask"] # No value replacements needed else: skip = [] task = ProcessingTask( job_context=job_context, skip_subtasks=skip )

See also

Last updated on