Task — Developer Reference
This page contains the developer-focused API documentation and usage examples for the Task base class.
Key Methods and Examples
requires() / provides()
@staticmethod
def requires():
return [CleanGuestModel, CleanReservationModel]
@staticmethod
def provides():
return [ProcessedGuestModel, ProcessedReservationModel]_run() example
def _run(self):
# Get input data
df = self.get_df_from_input(CleanGuestModel)
# Transform data
result_df = df.withColumn("new_column", F.lit("value"))
# Write output
self.write_to_output(ProcessedGuestModel, result_df)insert_subtask() example
task.insert_subtask(
CustomTask(job_context=job_context),
after=GuestMatchingTask
)Note: insert_subtask expects the before/after arguments to be a Type[Task] (the class), not a string. The method inserts the new subtask relative to the first existing subtask that is an isinstance() match of the passed type; if not found it raises a ValueError.
Common Methods
-
get_df_from_input(model: Type[Model]) -> DataFrame -
write_to_output(model: Type[Model], df: DataFrame, write_to_catalog: Optional[bool]) -
skip_task(task: Type[Task]) -> Task -
get_df_from_output(model: Type[Model]) -> DataFrame— Equivalent toget_df_from_inputbut for outputs; raises if the output model isn’t set inself.outputs. -
runnable_subtasks— A convenience property returning the list of subtasks excluding those whose class names are set inskip_subtasks.
Creating a Custom Task
Full example:
from etl_lib.tasks.Task import Task
from etl_lib.models.processed import ProcessedGuestModel
from pyspark.sql import functions as F
class CustomEnrichmentTask(Task):
"""Enriches guest data with custom calculations."""
def __init__(self, *, name=None, job_context, **kwargs):
super().__init__(
name=name,
job_context=job_context,
**kwargs
)
@staticmethod
def requires():
return [ProcessedGuestModel]
@staticmethod
def provides():
return [ProcessedGuestModel]
def _run(self):
# Get input
guest_df = self.get_df_from_input(ProcessedGuestModel)
# Apply transformation
enriched_df = guest_df.withColumn(
"vip_status",
F.when(F.col("stays") > 10, "Gold")
.when(F.col("stays") > 5, "Silver")
.otherwise("Bronze")
)
# Write output
self.write_to_output(ProcessedGuestModel, enriched_df)
def _run_incremental(self):
"""Example incremental implementation.
This method will be run if `is_incremental` is True and the task class sets
`_ALLOWS_INCREMENTAL = True`.
"""
incremental_df = self.get_df_from_input(ProcessedGuestModel)
# apply minimal changes or filtering based on incremental keys
incremental_transformed = incremental_df.withColumn("vip_status", F.lit("Bronze"))
self.write_to_output(ProcessedGuestModel, incremental_transformed)
def _merge_incremental(self):
"""Called after a successful `_run_incremental()` to merge back into the
main output state. Default implementation is a no-op; override if you need to
perform dedup/merge operations against an existing output store.
"""
passIncremental requires / provides
Tasks can also declare incremental-specific dependencies by implementing requires_incremental() or provides_incremental() on a subclass. If the parent task is running in incremental mode and both the parent and the subtask support incremental runs, the task base class will validate these incremental dependencies for the subtask before running it.
@staticmethod
def requires_incremental():
return [GuestIncrementalModel]
@staticmethod
def provides_incremental():
return [GuestIncrementalModel]skip_task() and skipping
If your task has subtasks that you want to skip at runtime, call skip_task(SubtaskClass). The base implementation stores the class name in skip_subtasks and runnable_subtasks will exclude those items at runtime. Example:
task.skip_task(GuestMatchingTask)write_to_output() behaviors
write_to_output() accepts an optional DataFrame and an optional write_to_catalog flag; if df is provided it will be used as the df on the output model instance. The model’s write method will be invoked only if write_to_catalog parameter is True, or if it’s None and the parent task’s write_to_catalog property is True.
Subtask dependency wiring
The Task base class wires up dependencies between a parent task and its subtasks by using _ensure_subtask_models(). This method combines the parent inputs/outputs and validates that each subtask’s declared requires(), provides() and (if running incrementally) requires_incremental() and provides_incremental() are present; otherwise a ValueError is raised.
Small runtime behavior note / bug
There is a small issue in the current Task.run() implementation: it is intended to print a diagnostic message when a job is run in incremental mode but the task does not support incremental execution; however, the conditional check used in the code is incorrect and that message will never be printed. The fallback to a full _run() still works. A suggested fix is to change the if in the fallback branch like so:
# current (buggy):
if self.is_incremental and self._ALLOWS_INCREMENTAL:
print("Initiated incremental run but task doesn't support it, initiating full run...")
# suggested (fixed):
if self.is_incremental and not self._ALLOWS_INCREMENTAL:
print("Initiated incremental run but task doesn't support it, initiating full run...")Back to process documentation: /processes/tasks/task