Task
The Task class is the abstract base class for all ETL tasks in the pipeline. It defines the fundamental structure and lifecycle for data processing operations.
Overview
Tasks are the building blocks of the ETL pipeline. Each task:
- Declares which models it requires as input
- Declares which models it provides as output
- Executes data transformations in the
_run()method - Can compose other tasks as subtasks
- Manages data reading/writing to the data catalog
- Can optionally support incremental runs by overriding
_run_incremental()and_merge_incremental()and setting_ALLOWS_INCREMENTAL = Trueon the class
Class Diagram
Task Lifecycle
Constructor Parameters
| Parameter | Type | Description |
|---|---|---|
name | Optional[str] | Custom name for the task (defaults to class name) |
job_context | JobContext | Context containing Spark session, config, and metadata |
subtasks | Optional[Sequence[Task]] | List of tasks to run as subtasks |
skip_subtasks | Optional[Sequence[str]] | Names of subtasks to skip |
write_to_catalog | bool | Whether to write outputs to data catalog (default: True) |
is_incremental | Optional[bool] | Override incremental mode for the task; by default this reads from job_context.is_incremental |
Best Practices
- Keep tasks focused: Each task should have a single, clear responsibility
- Use descriptive names: Override the
nameparameter for clarity in logs - Checkpoint DataFrames: Use
.checkpoint(eager=True)for complex transformations - Clean up resources: Override
_cleanup()to unpersist cached DataFrames - Document requirements: Clearly document which models are required and provided
- Handle missing columns: Check for column existence before accessing
- Use job context: Access Spark session and config through
self.job_context - Support incremental runs intentionally: If supporting incremental runs, set
_ALLOWS_INCREMENTAL = Truein your task subclass and implement_run_incremental()and_merge_incremental()(the base implementations are no-op). - Rely on
ensure_depsfor subtask wiring: The base task will automatically ensure required/provided models for each subtask by running_ensure_subtask_models()during runtime.
Note:
Task.run()prefers running subtasks if any exist. If there are no subtasks andis_incrementalisTrueand_ALLOWS_INCREMENTALisTrue, the task will run the incremental flow (_run_incremental()and_merge_incremental()). Otherwise a full_run()is executed. If incremental mode is requested but not supported by the task, the implementation falls back to a full run.
Related
- TaskGroup - Grouping tasks for execution
- ProcessingTask - Common processing orchestrator
- Models - Input/output data models
Last updated on