ProcessingTask — Developer Reference
This page contains code-related information, examples, and implementation details for the ProcessingTask.
Constructor Parameters
| Parameter | Type | Description |
|---|---|---|
name | Optional[str] | Custom name for the task |
job_context | JobContext | Context with Spark session and configuration |
subtasks | Optional[List[Task]] | Custom list of subtasks (uses default if None) |
skip_subtasks | Optional[List[str]] | Names of subtasks to skip during execution |
write_to_catalog | bool | Whether to write outputs to catalog (default: True) |
is_incremental | Optional[bool] | Whether to run in incremental mode; if omitted uses job_context.is_incremental |
Example Usage
Basic Usage
from etl_lib.tasks.processing.ProcessingTask import ProcessingTask
# Create and run with all default subtasks
task = ProcessingTask(
job_context=job_context,
write_to_catalog=True
)
task.run()Skip Specific Subtasks
# Skip guest matching (useful for testing or incremental updates)
task = ProcessingTask(
job_context=job_context,
skip_subtasks=["GuestMatchingTask"]
)
task.run()Subtask Type Hints
PROCESS_SUBTASK = Union[
Literal["ReservationMetricsTask"],
Literal["RoomMetricsTask"],
Literal["GuestMatchingTask"],
Literal["GuestMatchingCheckTask"],
Literal["GuestLoyaltyTask"],
Literal["ReplaceValuesTask"],
]
# Type-safe skip
task = ProcessingTask(
job_context=job_context,
skip_subtasks=["GuestMatchingTask", "GuestLoyaltyTask"]
)Implementation Details
Subtask Coordination
def _run_subtasks(self):
super()._run_subtasks()
# Collect results from subtasks
# Writes happen only if DataFrames have data. For incremental runs, only ProcessedAdded* models are written if not empty.
for model in (ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel):
try:
model_instance = self.outputs.get(model)
if model_instance and model_instance.df is not None:
self.write_to_output(model)
except Exception:
pass
### Subtask Initialization
- The default subtasks are created with `write_to_catalog=False` (so subtasks do not write directly to the catalog). The parent `ProcessingTask` assembles subtask outputs and controls the final write to the catalog (based on `write_to_catalog` and whether the DataFrames are non-empty).Shared State
All subtasks share access to:
- Input models (CleanGuestModel, CleanReservationModel, CleanRoomModel)
- Output models (ProcessedGuestModel, ProcessedReservationModel, ProcessedRoomModel)
- Incremental outputs: ProcessedAddedGuestModel, ProcessedAddedReservationModel, ProcessedAddedRoomModel
- Job context (Spark session, configuration)
Common Patterns and Examples
Testing Without Guest Matching
# Guest matching is slow - skip for faster testing
task = ProcessingTask(
job_context=job_context,
skip_subtasks=["GuestMatchingTask"]
)Incremental Processing
# Skip tasks that don't need re-running
task = ProcessingTask(
job_context=job_context,
skip_subtasks=[
"GuestMatchingTask", # Already matched
"GuestLoyaltyTask", # Loyalty computed
]
)Chain-Specific Processing
# Different chains may skip different subtasks
if chain_id == "athenaeum":
skip = ["ReplaceValuesTask"] # No value replacements needed
else:
skip = []
task = ProcessingTask(
job_context=job_context,
skip_subtasks=skip
)See also
- Back to process documentation: /processes/tasks/processing/processing-task
Last updated on