Skip to Content

Task

The Task class is the abstract base class for all ETL tasks in the pipeline. It defines the fundamental structure and lifecycle for data processing operations.

Overview

Tasks are the building blocks of the ETL pipeline. Each task:

  • Declares which models it requires as input
  • Declares which models it provides as output
  • Executes data transformations in the _run() method
  • Can compose other tasks as subtasks
  • Manages data reading/writing to the data catalog
  • Can optionally support incremental runs by overriding _run_incremental() and _merge_incremental() and setting _ALLOWS_INCREMENTAL = True on the class

Class Diagram

Task Lifecycle

Constructor Parameters

ParameterTypeDescription
nameOptional[str]Custom name for the task (defaults to class name)
job_contextJobContextContext containing Spark session, config, and metadata
subtasksOptional[Sequence[Task]]List of tasks to run as subtasks
skip_subtasksOptional[Sequence[str]]Names of subtasks to skip
write_to_catalogboolWhether to write outputs to data catalog (default: True)
is_incrementalOptional[bool]Override incremental mode for the task; by default this reads from job_context.is_incremental

Best Practices

  1. Keep tasks focused: Each task should have a single, clear responsibility
  2. Use descriptive names: Override the name parameter for clarity in logs
  3. Checkpoint DataFrames: Use .checkpoint(eager=True) for complex transformations
  4. Clean up resources: Override _cleanup() to unpersist cached DataFrames
  5. Document requirements: Clearly document which models are required and provided
  6. Handle missing columns: Check for column existence before accessing
  7. Use job context: Access Spark session and config through self.job_context
  8. Support incremental runs intentionally: If supporting incremental runs, set _ALLOWS_INCREMENTAL = True in your task subclass and implement _run_incremental() and _merge_incremental() (the base implementations are no-op).
  9. Rely on ensure_deps for subtask wiring: The base task will automatically ensure required/provided models for each subtask by running _ensure_subtask_models() during runtime.

Note: Task.run() prefers running subtasks if any exist. If there are no subtasks and is_incremental is True and _ALLOWS_INCREMENTAL is True, the task will run the incremental flow (_run_incremental() and _merge_incremental()). Otherwise a full _run() is executed. If incremental mode is requested but not supported by the task, the implementation falls back to a full run.

Last updated on