Queensway Job — Developer Reference
Developer reference for the Queensway orchestrator job, which runs the ETL pipeline for Queensway properties using Guestline chain tasks.
File: gp-etl-jobs/jobs/queensway_task.py
Job Example
from etl_lib.Orchestrator import Orchestrator
from etl_lib.data.BigQuery import BigQuery
from etl_lib.job.JobContext import JobContext
from etl_lib.tasks.chains.guestline import (
GuestlineRawTask,
CleanGuestlineTask,
)
from etl_lib.tasks.processing.ProcessingTask import ProcessingTask
from etl_lib.tasks.reports.ReportsTask import ReportsTask
job_context = JobContext(chain_id="queensway", partitions=32)
db_sink = BigQuery(job_context)
# Optionally skip guestmatching
processing_task = ProcessingTask(job_context=job_context, skip_subtasks=["GuestMatchingTask"])
processing_task.skip_task(GuestMatchingTask)
orchestrator = Orchestrator(
job_context=job_context,
tasks=[
GuestlineRawTask(job_context=job_context, skip_ingestion=False),
CleanGuestlineTask(job_context=job_context),
processing_task,
ReportsTask(job_context=job_context, database_sink=db_sink, write_to_catalog=True),
],
model_configs={
"RawRoompicksModel": {"database": "etl_gp_raw_queensway"},
"RawPersonprofilesModel": {"database": "etl_gp_raw_queensway"},
},
)
orchestrator.run()Notes
- Uses
Guestlineingester and crawler to fetch/persist raw payloads. - Overrides
Raw*model databases to point to the queensway raw database for local/test and production environments. - Dispatcher uses
JobContext(chain_id='queensway')so all tasks run with the correct chain_id.
Back to tasks: /processes/tasks/chains/queensway
Last updated on