Skip to main content

Customizing DAG Generation

This reference covers every aspect of Starlake DAG customization: YAML configuration files, dagRef references at project, domain, and table level, Jinja2 templates, pre-load strategies, and the IStarlakeJob Python factory interface. Use it to tailor generated DAGs for Airflow, Dagster, or Snowflake Tasks to your execution and scheduling requirements.

Starlake ships with ready-to-use DAG templates. Customize them to fit your specific needs for any scheduler. You need familiarity with the Jinja2 templating language and Python.

Starlake is not an orchestration tool. It generates DAGs based on templates and runs your transforms in the correct order on the scheduling and monitoring tool of your choice.

Starlake DAG generation relies on:

  • starlake command line tool
  • DAG configuration files and their references within loads and tasks
  • Templates (customizable Jinja2 files)
  • starlake-orchestration framework to dynamically generate runtime tasks
  • Dependency analysis of SQL queries to determine correct execution order

For a hands-on introduction, see the Orchestration Tutorial.

Prerequisites

Before using Starlake DAG generation, install the following minimum version:

  • starlake: 1.0.1 or higher

Additional requirements depend on your orchestrator. See the specific pages below.

Orchestrator-specific guides

Choose your orchestrator for detailed customization instructions:

The dag-generate command

starlake dag-generate [options]

Available options:

ParameterCardinalityDescription
--outputDir <value>optionalPath for saving the resulting DAG file(s) (${SL_ROOT}/metadata/dags/generated by default).
--cleanoptionalRemove existing DAG file(s) before generation (false by default).
--domainsoptionalGenerate DAG file(s) to load schemas (true by default if --tasks option has not been specified).
--tasksoptionalGenerate DAG file(s) for tasks (true by default if --domains option has not been specified).
--tags <value>optionalGenerate DAG file(s) for the specified tags only (no tags by default).
--withRolesoptionalInclude roles in the generated DAG file(s) (false by default).
--reportFormat <value>optionalFormat of the generated report: text or html (text by default).

Configuration

All DAG configuration files reside in ${SL_ROOT}/metadata/dags. The root element is dag.

dagRef references

Reference a DAG configuration by its file name without extension.

dagRef for loading data

DAG configuration for loading data

Define the configuration file for loading data at one of three levels:

  • Project level in ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.load property. This applies the same configuration as the default for all tables in the project.
application:
dagRef:
load: load_cloud_run_domain
#...
  • Domain level in ${SL_ROOT}/metadata/load/{domain}/_config.sl.yml under the load.metadata.dagRef property. This applies to all tables in the domain.
load:
metadata:
dagRef: load_bash_domain
#...
  • Table level in ${SL_ROOT}/metadata/load/{domain}/{table}.sl.yml under the table.metadata.dagRef property. This applies to the table only.
table:
metadata:
dagRef: load_bash_domain
#...

dagRef for transformations

DAG configuration for transforming data

Define the configuration file for transforming data at one of two levels:

  • Project level in ${SL_ROOT}/metadata/application.sl.yml under the application.dagRef.transform property. This applies to all transformations in the project.
application:
dagRef:
transform: norm_cloud_run_domain
#...
  • Transformation level in ${SL_ROOT}/metadata/transform/{domain}/{transformation}.sl.yml under the task.dagRef property. This applies to the transformation only.
task:
dagRef: agr_cloud_run_domain
#...

DAG properties

A DAG configuration defines four properties: comment, template, filename and options.

dag:
comment: "dag for transforming tables for domain {\{domain\}} with cloud run"
template: "custom_scheduled_task_cloud_run.py.j2"
filename: "{\{domain\}}_norm_cloud_run.py"
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"

#...

Comment

Comment

A short description of the generated DAG.

Template

Template

The path to the Jinja2 template that generates the DAG(s):

  • An absolute path
  • A relative path to ${SL_ROOT}metadata/dags/template
  • A relative path to src/main/templates/dags in the Starlake resource directory

Filename

Filename

The relative path of the generated DAG file(s), relative to the outputDir option (or its default value).

The value may include special variables that control the number of generated DAGs:

  • domain: One DAG per domain
dag:
filename: "{\{domain\}}_norm_cloud_run.py" # one DAG per domain
#...
  • table: One DAG per table
dag:
filename: "{\{domain\}}_{\{table\}}_norm_cloud_run.py" # one DAG per table
#...

Without special variables, a single DAG is generated for all tables.

Options

Options

A dictionary of key-value pairs passed to the template.

Some options are common to all templates.

Starlake environment variables

sl_env_var defines Starlake environment variables passed as an encoded JSON string:

dag:
options:
sl_env_var: "{\"SL_ROOT\": \"${root_path}\", \"SL_DATASETS\": \"${root_path}/datasets\", \"SL_TIMEZONE\": \"Europe/Paris\"}"
#...
Pre-load strategy

pre_load_strategy conditionally loads the tables of a domain within the DAG. Four strategies are available:

NONE
NONE

No condition. No pre-load tasks execute. This is the default strategy.

IMPORTED
IMPORTED

Requires at least one file in the landing area (${SL_ROOT}/incoming/{domain} by default, unless incoming_path is specified). If files exist, sl_import imports the domain before loading. Otherwise, loading is skipped.

dag:
options:
pre_load_strategy: "imported"
#...
PENDING
PENDING

Requires at least one file in the pending datasets area (${SL_ROOT}/datasets/pending/{domain} by default, unless pending_path is specified). Otherwise, loading is skipped.

dag:
options:
pre_load_strategy: "pending"
#...
ACK
ACK

Requires an ack file at the specified path (${SL_ROOT}/datasets/pending/{domain}/{{{{ds}}}}.ack by default, unless global_ack_file_path is specified). Otherwise, loading is skipped.

dag:
options:
pre_load_strategy: "ack"
#...
Run dependencies first

run_dependencies_first controls whether the generator recursively includes all dependencies associated with each task in the DAG (False by default).

dag:
options:
run_dependencies_first: True
#...

Additional options

Depending on the chosen template, a specific concrete factory class extending ai.starlake.job.IStarlakeJob is instantiated. Additional options may be required.

The IStarlakeJob interface

ai.starlake.job.IStarlakeJob is the generic factory interface responsible for generating the tasks that run the Starlake stage, load and transform commands.

sl_import generates the task that runs the stage command:

def sl_import(
self,
task_id: str,
domain: str,
**kwargs) -> BaseOperator:
#...
NameTypeDescription
task_idstrThe optional task id (\{domain\}_import by default)
domainstrThe required domain to import

sl_load generates the task that runs the load command:

def sl_load(
self,
task_id: str,
domain: str,
table: str,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
NameTypeDescription
task_idstrThe optional task id (\{domain\}_\{table\}_load by default)
domainstrThe required domain of the table to load
tablestrThe required table to load
spark_configStarlakeSparkConfigThe optional ai.starlake.job.StarlakeSparkConfig

sl_transform generates the task that runs the transform command:

def sl_transform(
self,
task_id: str,
transform_name: str,
transform_options: str=None,
spark_config: StarlakeSparkConfig=None, **kwargs) -> BaseOperator:
#...
NameTypeDescription
task_idstrThe optional task id ({transform_name} by default)
transform_namestrThe transform to run
transform_optionsstrThe optional transform options
spark_configStarlakeSparkConfigThe optional ai.starlake.job.StarlakeSparkConfig

All three methods delegate to sl_job, which must be implemented in every concrete factory class:

def sl_job(
self,
task_id: str,
arguments: list,
spark_config: StarlakeSparkConfig=None,
**kwargs) -> BaseOperator:
#...
NameTypeDescription
task_idstrThe required task id
argumentslistThe required arguments of the starlake command to run
spark_configStarlakeSparkConfigThe optional ai.starlake.job.StarlakeSparkConfig

For orchestrator-specific concrete factory classes, templates, dependency handling, and advanced customization, see the dedicated pages:

Frequently Asked Questions

How does DAG generation work in Starlake?

Starlake uses a framework composed of: the dag-generate command, YAML configuration files in metadata/dags, Jinja2 templates, and the starlake-orchestration framework that dynamically generates the tasks.

Where to define the dagRef reference for data loading?

At the project level in application.sl.yml (application.dagRef.load property), at the domain level in _config.sl.yml (load.metadata.dagRef property), or at the table level in the table's .sl.yml file.

Where to define the dagRef reference for transformations?

At the project level in application.sl.yml (application.dagRef.transform property) or at the transformation level in the .sl.yml file (task.dagRef property).

What are the available pre-load strategies?

Four strategies: NONE (no condition), IMPORTED (file present in the landing area), PENDING (file in the pending zone), ACK (acknowledgment file present).

How to generate a DAG per domain or per table?

The filename property in the YAML configuration controls granularity. Use {{domain}} for one DAG per domain, {{domain}}_{{table}} for one DAG per table.

How to pass Starlake environment variables to the DAG?

Use the sl_env_var option in the YAML configuration. It accepts an encoded JSON string containing the environment variables.

What is the IStarlakeJob interface?

It is the generic Python factory interface responsible for generating tasks for the stage, load, and transform commands. Each orchestrator has concrete classes that implement the sl_job method.

What options are available for the dag-generate command?

--outputDir (output directory), --clean (remove old files), --domains (generate for schemas), --tasks (generate for tasks), --tags (filter by tags), --withRoles (include roles), --reportFormat (text or html).