Cosmos API is a great tool to orchestrate dbt tasks through Airflow. It generates/renders DAG based on the dbt code. Using this tool together with dbt, you can see each dbt model as a single task in Airflow DAG, making your workflow easier to manage.
We have used dbt core + Astronomer Cosmos API integration to perform data transformation on Snowflake. However, we came across a few scenarios that can be achieved easily with dbt core/cloud alone but need additional work while integrating dbt core with Cosmos API.Â
In this blog, we will show you how we did this work and cover some tips for deploying and integrating dbt with Cosmos API.
Scenarios
Run the dbt source freshness command as an Airflow task.
Run full refresh on the selected incremental model.
Run the View task only if it is modified.
Tech Stack
dbt core.
Astronomer Managed Airflow.
Cosmos API with default Execution Mode (
LOCAL
).Github Actions for Airflow deployment.
Astronomer Docker image with dbt installed as virtual environment.
Where to Start?
Let’s understand some basics before diving into each scenario.
How does dbt work with Cosmos API? We already have a great blog that gives an overview of Cosmos API + dbt core integration.
Basically, the dbt code is part of the Airflow DAGs project (dags directory). We have used the DBTTaskGroup
operator to manage dbt resources into separate logical groups, as shown in the code snippet below.
transform_model = DbtTaskGroup(
group_id="transform_model",
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=RenderConfig(
select=["path:models"],
load_method=LoadMode.DBT_MANIFEST
)
)
transform_snapshot = DbtTaskGroup(
group_id="transform_snapshot",
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=RenderConfig(
select=["path:snapshots"],
load_method=LoadMode.DBT_MANIFEST
)
)
During the execution of each task, the dbt command is prepared and executed internally by the Cosmos API. Note that the dbt command is prepared based on the resource type, like model, seed, snapshot, etc.Â
As examples:
['/usr/local/airflow/dbt_venv/bin/dbt', 'run', '--models', '', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
['/usr/local/airflow/dbt_venv/bin/dbt', 'snapshot', '--models', '', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
['/usr/local/airflow/dbt_venv/bin/dbt', 'test', '--models', '', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
Scenario #1: Source Freshness Check
In dbt, a source can be configured for freshness check. We want to ensure the DAG won’t run on stale data and that the sources are receiving fresh data at specified intervals. If not, then it fails the task and sends a notification. Learn more details about source freshness.Â
dbt source freshness
How to achieve this with Cosmos API?
Unfortunately, there is no direct option to achieve source freshness in Cosmos API. DbtDag
and DbtTaskGroup
are commonly used to generate DAG from dbt projects. If looked deeper, these operators are wrapped around the operators below (Similar operators are available for virtual environments).
DbtLocalBaseOperator
– base operatorDbtBuildLocalOperator
–dbt build
DbtLSLocalOperator
–dbt ls
DbtSeedLocalOperator
–dbt seed
DbtSnapshotLocalOperator
–dbt snapshot
DbtRunLocalOperator
–dbt run
DbtTestLocalOperator
–dbt test
DbtRunOperationLocalOperator
–dbt run-operation
As you can see, the above list doesn’t include an operator for executing the dbt source
command.Â
How to execute the dbt source freshness command?
Two options: BashOperator
and DBTLocalBaseOperator
Option #1a: Bash Operator
As mentioned in Astronomer documentation here, BashOperator
can be used to execute dbt commands. This does require the profiles.yml
(containing database configuration) in the dbt project directory. This can be further improved by passing the Airflow connection as an environment variable in the DAG, as mentioned here.
Option #1b: Dbt Local Base Operator
Instead of dbt’s profiles.yml
, we want to utilize the database connection defined in the Airflow. So we went for DBTLocalBaseOperator
. Like DBTTaskGroup
, DBTLocalBaseOperator
uses database credentials from Airflow to build dbt profiles.yml
.
src_fresh_check = DbtLocalBaseOperator(
profile_config=profile_config,
task_id="src_fresh_check_task",
project_dir=DBT_PROJECT_PATH,
dbt_executable_path=DBT_EXECUTABLE_PATH,
base_cmd = ["source","freshness"],
)
This can be accomplished by simply overriding the base command of the Python class. In fact, any dbt command can be added here if there is a need for more customization.
Scenario #2: Full Refresh of Incremental Models
As we understand, a full refresh on incremental models and seeds is needed for a number of different reasons:
Schema change on an incremental model.
A new column is added and needs to be backfilled.
Transformation logic changed on an incremental model.
Schema change of seeds.
Note that if there is no requirement to backfill newly added columns, then dbt’s configuration on_schema_change
 parameter can adapt schema change without refreshing/rebuilding entire models. Learn more about changing columns of an incremental model.
How to apply full refresh on the Incremental model?
Cosmos API has the provision to supply a full_refresh
 flag to dbt models.
transform_model = DbtTaskGroup(
group_id="transform_model",
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
operator_args={
"full_refresh": True,
},
render_config=RenderConfig(
select=["path:models"],
load_method=LoadMode.DBT_MANIFEST
)
)
During the execution of each task, dbt command (for dbt run
) looks like.
['/usr/local/airflow/dbt_venv/bin/dbt', 'run', '--models', '', '--full-refresh', '--profiles-dir', '/usr/local/airflow/dags/dbtlearn', '--profile', 'dbtlearn', '--target', 'dev']
However, there are few issues with this approach:
It will apply a full refresh flag to all incremental models and seeds, which results in rebuilding all those models during every DAG run irrespective of whether the schema changed, which defeats the purpose of defining models as incremental.
Because Cosmos API does not have a provision to refer to the previous state file of models (dbt metadata file called manifest.json), so there is no way that DAG knows if there is a schema change.
Even if we are able to solve the above issue, it is hard to decide whether to run the task/dbt command with full refresh or not at runtime within a single DAG. Â
From the above points, it is clear that a single DAG is not enough to perform a normal run as well as a full refresh on selected incremental models. So, we have introduced another DAG to refresh the incremental models fully.
Here is the functionality of both DAGs:
DAG #1: Transformation DAG
Runs every hour and performs transformation on new and/or modified data.
Run dbt source freshness check.
Run and test all models, snapshots, and seeds.
Run incremental model but no full refresh.
DAG #2: Full refresh DAG
This DAG is not scheduled and runs manually.
Run full refresh only on modified incremental models and their dependents.
Run full refresh on seed if the schema changes.
Let’s jump to the next challenge of state file (manifest.json) management.
dbt generates a metadata file called manifest.json
. It represents the current state of all models.
Example:
With dbt core, the command below runs all models that have been modified compared to the previous state.
dbt run --select "state:modified" --state
Learn more about the stateful selection.
Learn more about manifest.json.
As we understand, Airflow’s DAG code needs to be deployed every time there is a change in dbt models. So, we have utilized the CICD
pipeline to generate and store manifest.json
.
During deployment:
Download the
manifest.json
of the previous deployment from storage (AWS or Azure) and save it under the Airflows dags directory.Generates the
manifest.json
of the current state and uploads to storage (AWS or Azure).
jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Install dbt
run: |
python -m pip install -r /requirements.txt
dbt deps
- name: Generate manifest file for current state
run: |
cd
dbt ls
- name: Download dbt manifest of previous state
run: |
az storage fs file download --path manifest.json \
--destination dags//
- name: Upload dbt manifest of current state
run: |
az storage fs file upload --path manifest.json \
--source /target/manifest.json \
--overwrite true
- name: Upload same dbt manifest into archive
run: |
az storage fs file upload --path /manifest.json \
--source /target/manifest.json
- name: Deploy to Astro
uses: astronomer/deploy-action@v0.3
Steps:
Checkout repository.
Install dbt and its dependencies.Â
Run the
dbt ls
command to generatemanifest.json
. It will be generated in the target directory.Download the
manifest.json
of the previous deployment from Azure storage.Upload (overwrite)
manifest.json
of current deployment into Azure storage.Upload
manifest.json
of current deployment with date-time directory into Azure storage.
Once deployed, manifest.json
is now part of DAG’s code and will be referred to while rendering DAG #2.
full_refresh = DbtTaskGroup(
group_id="full_refresh_group",
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
operator_args={
"full_refresh": True
},
render_config=RenderConfig(
load_method=LoadMode.DBT_LS,
select=["state:modified+"],
env_vars={
"DBT_STATE": DBT_METADATA_PATH # dir path of previous state manifest.json under dags directory
},
)
)
Let’s review RenderConfig:
LoadMode.DBT_LS:
env_vars
is not supported in other parsing methods, such asLoadMode.MANIFEST
orLoadMode.AUTOMATIC
(default). Note thatLoadMode.DBT_LS
is slower as compared to LoadMode.MANIFEST
and should not be used if DAG is going to run more often, like every hour.env_vars: To pass environment variables to dbt, Cosmos API does not provide any parameter to define the path of
manifest.json
. So, we have used the dbt environmental variable calledDBT_STATE
 to define themanifest.json
path.select: This is used to select modified nodes/models based on
manifest.json
. Note that this selection will be used while rendering DAG and not at execution time.
Once deployed, the manifest.json
files are compared, and DAG #2 renders only modified models as a task. If the Airflow project (with dbt code) is deployed multiple times with no changes in models, DAG#2 will not have any task.
Scenario #3: View Recreation
DAG#1 is scheduled every hour and runs all models, including views. If there is no change in the view definition, then recreating the view is an unnecessary consumption of computation (though it is very small, it can be significant over a period of time). Because DAG #2 can now deal with modified models (including View’s), we can skip View’s related task from DAG #1 entirely.
Â
Use the exclude
parameter of RenderConfig
to filter views from rendering on DAG #1.
transform_model = DbtTaskGroup(
group_id="transform_model",
project_config=project_config,
profile_config=profile_config,
execution_config=execution_config,
render_config=RenderConfig(
select=["path:models"],
exclude=["config.materialized:view"],
load_method=LoadMode.DBT_MANIFEST
)
)
No changes required on DAG #2. Remember, DAG #2 renders a model based on manifest.json
. In case there is a change in view definition, it will be rendered into the DAG #2.
Miscellaneous Configuration Definition
profile_config = ProfileConfig(
profile_name="dbtlearn",
target_name="dev",
profile_mapping=""
)
project_config = ProjectConfig(
dbt_project_path="",
manifest_path="",
)
execution_config = ExecutionConfig(
dbt_executable_path="",
)
Deployment Steps
Note that both DAGs should not run at the same time. Before every deployment, the scheduled DAG should be paused. If there is a need for full refresh runs or View recreation, then first run DAG#2 manually and then unpause DAG#1.
Conclusion
Overall, we have built an out-of-box solution to add more flexibility to DAG tasks/dbt model execution. Now, Incremental full refresh and View Recreation of specific resources can run without impacting regular runs but in a controlled fashion. With the help of dbt metadata files, DAG#2 is smart enough to generate tasks only for modified models. So if there is no change, no tasks will be rendered in DAG#2 and avoid any side effects if executed accidentally.
Because dbt metadata files are now available on the server, a more customized solution is possible though it will be more static in nature(because metadata files are generated during deployment).
Need additional support to optimize dbt core functionality with Cosmos API?