Customizing DAG Generation
This reference covers every aspect of Starlake DAG customization: YAML configuration files, dagRef references at project, domain, and table level, Jinja2 templates, pre-load strategies, and the IStarlakeJob Python factory interface. Use it to tailor generated DAGs for Airflow, Dagster, or Snowflake Tasks to your execution and scheduling requirements.
Starlake ships with ready-to-use DAG templates. Customize them to fit your specific needs for any scheduler. You need familiarity with the Jinja2 templating language and Python.
Starlake is not an orchestration tool. It generates DAGs based on templates and runs your transforms in the correct order on the scheduling and monitoring tool of your choice.
Starlake DAG generation relies on:
- starlake command line tool
- DAG configuration files and their references within loads and tasks
- Templates (customizable Jinja2 files)
- starlake-orchestration framework to dynamically generate runtime tasks
- Dependency analysis of SQL queries to determine correct execution order
For a hands-on introduction, see the Orchestration Tutorial.
Prerequisites
Before using Starlake DAG generation, install the following minimum version:
- starlake: 1.0.1 or higher
Additional requirements depend on your orchestrator. See the specific pages below.
Orchestrator-specific guides
Choose your orchestrator for detailed customization instructions:
- Customize Airflow DAGs -- Concrete factory classes, templates, data-aware scheduling, user-defined macros, and Terraform integration for Apache Airflow.
- Customize Dagster DAGs -- Concrete factory classes, templates, and multi-asset sensors for Dagster.
- Customize Snowflake Task DAGs -- Task generation and configuration for Snowflake Tasks.
The dag-generate command
starlake dag-generate [options]
Available options:
| Parameter | Cardinality | Description |
|---|---|---|
--outputDir <value> | optional | Path for saving the resulting DAG file(s) (${SL_ROOT}/metadata/dags/generated by default). |
| --clean | optional | Remove existing DAG file(s) before generation (false by default). |
| --domains | optional | Generate DAG file(s) to load schemas (true by default if --tasks option has not been specified). |
| --tasks | optional | Generate DAG file(s) for tasks (true by default if --domains option has not been specified). |
--tags <value> | optional | Generate DAG file(s) for the specified tags only (no tags by default). |
| --withRoles | optional | Include roles in the generated DAG file(s) (false by default). |
--reportFormat <value> | optional | Format of the generated report: text or html (text by default). |
Configuration
All DAG configuration files reside in ${SL_ROOT}/metadata/dags. The root element is dag.
dagRef references
Reference a DAG configuration by its file name without extension.
dagRef for loading data
Define the configuration file for loading data at one of three levels:
- Project level in ${SL_ROOT}/metadata/application.sl.yml under the
application.dagRef.loadproperty. This applies the same configuration as the default for all tables in the project.
application:
dagRef:
load: load_cloud_run_domain
#...
- Domain level in ${SL_ROOT}/metadata/load/{domain}/_config.sl.yml under the
load.metadata.dagRefproperty. This applies to all tables in the domain.
load:
metadata:
dagRef: load_bash_domain
#...
- Table level in ${SL_ROOT}/metadata/load/{domain}/{table}.sl.yml under the
table.metadata.dagRefproperty. This applies to the table only.
table:
metadata:
dagRef: load_bash_domain
#...
dagRef for transformations
Define the configuration file for transforming data at one of two levels:
- Project level in ${SL_ROOT}/metadata/application.sl.yml under the
application.dagRef.transformproperty. This applies to all transformations in the project.
application:
dagRef:
transform: norm_cloud_run_domain
#...
- Transformation level in ${SL_ROOT}/metadata/transform/{domain}/{transformation}.sl.yml under the
task.dagRefproperty. This applies to the transformation only.
task:
dagRef: agr_cloud_run_domain
#...
DAG properties
A DAG configuration defines four properties: comment, template, filename and options.
dag:
comment: "dag for transforming tables for domain {\{domain\}} with cloud run"
template: "custom_scheduled_task_cloud_run.py.j2"
filename: "{\{domain\}}_norm_cloud_run.py"
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Comment
A short description of the generated DAG.
Template
The path to the Jinja2 template that generates the DAG(s):
- An absolute path
- A relative path to ${SL_ROOT}metadata/dags/template
- A relative path to src/main/templates/dags in the Starlake resource directory
Filename
The relative path of the generated DAG file(s), relative to the outputDir option (or its default value).
The value may include special variables that control the number of generated DAGs:
- domain: One DAG per domain
dag:
filename: "{\{domain\}}_norm_cloud_run.py" # one DAG per domain
#...
- table: One DAG per table
dag:
filename: "{\{domain\}}_{\{table\}}_norm_cloud_run.py" # one DAG per table
#...
Without special variables, a single DAG is generated for all tables.
Options
A dictionary of key-value pairs passed to the template.
Some options are common to all templates.
Starlake environment variables
sl_env_var defines Starlake environment variables passed as an encoded JSON string:
dag:
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Pre-load strategy
pre_load_strategy conditionally loads the tables of a domain within the DAG. Four strategies are available:
NONE
No condition. No pre-load tasks execute. This is the default strategy.
IMPORTED
Requires at least one file in the landing area (${SL_ROOT}/incoming/{domain} by default, unless incoming_path is specified). If files exist, sl_import imports the domain before loading. Otherwise, loading is skipped.
dag:
options:
pre_load_strategy: "imported"
#...
PENDING
Requires at least one file in the pending datasets area (${SL_ROOT}/datasets/pending/{domain} by default, unless pending_path is specified). Otherwise, loading is skipped.
dag:
options:
pre_load_strategy: "pending"
#...
ACK
Requires an ack file at the specified path (${SL_ROOT}/datasets/pending/{domain}/{{{{ds}}}}.ack by default, unless global_ack_file_path is specified). Otherwise, loading is skipped.
dag:
options:
pre_load_strategy: "ack"
#...
Run dependencies first
run_dependencies_first controls whether the generator recursively includes all dependencies associated with each task in the DAG (False by default).
dag:
options:
run_dependencies_first: True
#...
Additional options
Depending on the chosen template, a specific concrete factory class extending ai.starlake.job.IStarlakeJob is instantiated. Additional options may be required.
The IStarlakeJob interface
ai.starlake.job.IStarlakeJob is the generic factory interface responsible for generating the tasks that run the Starlake stage, load and transform commands.
sl_import generates the task that runs the stage command:
def sl_import(
self,
task_id: str,
domain: str,
**kwargs) -> BaseOperator:
#...
| Name | Type | Description |
|---|---|---|
| task_id | str | The optional task id (\{domain\}_import by default) |
| domain | str | The required domain to import |
sl_load generates the task that runs the load command:
def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
| Name | Type | Description |
|---|---|---|
| task_id | str | The optional task id (\{domain\}_\{table\}_load by default) |
| domain | str | The required domain of the table to load |
| table | str | The required table to load |
| spark_config | StarlakeSparkConfig | The optional ai.starlake.job.StarlakeSparkConfig |
sl_transform generates the task that runs the transform command:
def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:
#...
| Name | Type | Description |
|---|---|---|
| task_id | str | The optional task id ({transform_name} by default) |
| transform_name | str | The transform to run |
| transform_options | str | The optional transform options |
| spark_config | StarlakeSparkConfig | The optional ai.starlake.job.StarlakeSparkConfig |
All three methods delegate to sl_job, which must be implemented in every concrete factory class:
def sl_job(
self,
task_id: str,
arguments: list,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
| Name | Type | Description |
|---|---|---|
| task_id | str | The required task id |
| arguments | list | The required arguments of the starlake command to run |
| spark_config | StarlakeSparkConfig | The optional ai.starlake.job.StarlakeSparkConfig |
For orchestrator-specific concrete factory classes, templates, dependency handling, and advanced customization, see the dedicated pages:
Frequently Asked Questions
How does DAG generation work in Starlake?
Starlake uses a framework composed of: the dag-generate command, YAML configuration files in metadata/dags, Jinja2 templates, and the starlake-orchestration framework that dynamically generates the tasks.
Where to define the dagRef reference for data loading?
At the project level in application.sl.yml (application.dagRef.load property), at the domain level in _config.sl.yml (load.metadata.dagRef property), or at the table level in the table's .sl.yml file.
Where to define the dagRef reference for transformations?
At the project level in application.sl.yml (application.dagRef.transform property) or at the transformation level in the .sl.yml file (task.dagRef property).
What are the available pre-load strategies?
Four strategies: NONE (no condition), IMPORTED (file present in the landing area), PENDING (file in the pending zone), ACK (acknowledgment file present).
How to generate a DAG per domain or per table?
The filename property in the YAML configuration controls granularity. Use {{domain}} for one DAG per domain, {{domain}}_{{table}} for one DAG per table.
How to pass Starlake environment variables to the DAG?
Use the sl_env_var option in the YAML configuration. It accepts an encoded JSON string containing the environment variables.
What is the IStarlakeJob interface?
It is the generic Python factory interface responsible for generating tasks for the stage, load, and transform commands. Each orchestrator has concrete classes that implement the sl_job method.
What options are available for the dag-generate command?
--outputDir (output directory), --clean (remove old files), --domains (generate for schemas), --tasks (generate for tasks), --tags (filter by tags), --withRoles (include roles), --reportFormat (text or html).