Loading dbt models into Dagster as assets
Now is the moment that we’ve been building up to since the beginning of this module. Let’s see your dbt models in a Dagster asset graph!
Turning dbt models into assets with @dbt_assets
The star of the show here is the @dbt_assets decorator. This is a specialized asset decorator that wraps around a dbt project to tell Dagster what dbt models exist. In the body of the @dbt_assets definition, you write exactly how you want Dagster to run your dbt models.
Many Dagster projects may only need one @dbt_assets-decorated function that manages the entire dbt project. However, you may need to create multiple definitions for various reasons, such as:
- You have multiple dbt projects
- You want to exclude certain dbt models
- You want to only execute
dbt runand notdbt buildon specific models - You want to customize what happens after certain models finish, such as sending a notification
- You need to configure some sets of models differently
We’ll only create one @dbt_assets definition for now, but in a later lesson, we’ll encounter a use case for needing another @dbt_assets definition.
Loading the models as assets
Create a new file
dbt.pyin thedefs/assets/directory. You can create this file manually or usedg:dg scaffold defs dagster.asset assets/dbt.py⚠️ Note: The scaffold command creates a basic
@dg.assettemplate as a starting point. We'll be replacing the scaffolded content entirely with dbt-specific code below, since we need to use the specialized@dbt_assetsdecorator instead of the standard@dg.assetdecorator.Replace the contents of
dbt.pywith the following imports:# src/dagster_and_dbt/defs/assets/dbt.py import dagster as dg from dagster_dbt import dbt_assets, DbtCliResource from dagster_and_dbt.defs.project import dbt_projectNext, we'll use the
@dbt_assetsdecorator to create a new asset function and provide it with a reference to the project's manifest file:@dbt_assets( manifest=dbt_project.manifest_path, ) def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):Here, we used
dbt_project.manifest_pathto provide the reference to the project's manifest file. This is possible because thedbt_projectrepresentation we created earlier contains the manifest path, accessible by using themanifest_pathattribute.Notice we provided two arguments here. The first argument is the
context, which indicates which dbt models to run and any related configurations. The second refers to the dbt resource you’ll be using to run dbt.Finally, add the following to the body of
dbt_analyticsfunction:yield from dbt.cli(["run"], context=context).stream()Let’s review what’s happening in this line in a bit more detail:
- We use the
dbtresource (defined in the asset's second argument, which is aDbtCliResource) to execute a dbt command through its.climethod. - The
.stream()method fetches the events and results of this dbt execution.- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
dbt run.
- This is one of multiple ways to get the Dagster events, such as what models materialized or tests passed. We recommend starting with this and exploring other methods in the future as your use cases grow (such as fetching the run artifacts after a run). In this case, the above line will execute
- The results of the
streamare a Python generator of what Dagster events happened. We usedyield from(not justyield!) to have Dagster track asset materializations.
- We use the
At this point, defs/assets/dbt.py should look like this:
# src/dagster_and_dbt/defs/assets/dbt.py
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets
from dagster_and_dbt.defs.project import dbt_project
@dbt_assets(
manifest=dbt_project.manifest_path,
)
def dbt_analytics(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["run"], context=context).stream()