Testing dependencies
Most Dagster asset graphs contain multiple assets that depend on the output of other assets. Linking one asset to another is how you define the structure of your graph. When your assets take in inputs outside of their control, it is important to still configure deterministic tests that will always execute as expected.
We will add an additional asset downstream of state_population_file
that takes in its output:
# /dagster_testing/assets/lesson_3.py
@dg.asset
def total_population(state_population_file: list[dict]) -> int:
return sum([int(x["Population"]) for x in state_population_file])
The code above defines an asset called total_population
:
- Takes in the list of dicts output by the asset
state_population_file
. - Returns the total population of all cities in the list.
The relationship of state_population_file
and total_population
is defined by the input parameter on total_population
. What would a test look like for this asset? Click View answer to view it.
def test_total_population():
state_populations = [
{
"City": "New York",
"Population": "8804190",
},
{
"City": "Buffalo",
"Population": "278349",
},
{
"City": "Yonkers",
"Population": "211569",
},
]
assert lesson_3.total_population(state_populations) == 9294108
Because we are setting the input parameter for total_population
we can ensure what the expected outcome will be. In this case the sum of the three cities is 9,294,108.
Like the other asset test, this test looks like standard Python. Again, we can run this test with pytest
:
> pytest dagster_testing_tests/test_lesson_3.py::test_total_population
...
dagster_testing_tests/test_lesson_3.py . [100%]
Testing materializations
As your pipelines become more complex, it's a good idea to test materializing your Dagster assets. dg.materialize()
is a dagster.ExecuteInProcessResult
which will execute assets. We can then check the success
property of the execution to ensure that everything materialized as expected.
Since the processed_file
asset only relies on the output of the loaded_file
asset, we can materialize both of those assets together to ensure they are working as expected:
def test_assets():
_assets = [
lesson_3.state_population_file,
lesson_3.total_population,
]
result = dg.materialize(_assets)
assert result.success
> pytest dagster_testing_tests/test_lesson_3.py::test_assets
...
dagster_testing_tests/test_lesson_3.py . [100%]
Confirming that the assets materialize without issue is a great start, but we still want to check the return values of each asset. Luckily materialize()
allows us to access the output value with the output_for_node()
method for each asset from the materialization:
def test_assets():
_assets = [
lesson_3.state_population_file,
lesson_3.total_population,
]
result = dg.materialize(_assets)
assert result.success
state_populations = [
{
"City": "New York",
"Population": "8804190",
},
{
"City": "Buffalo",
"Population": "278349",
},
{
"City": "Yonkers",
"Population": "211569",
},
]
assert result.output_for_node("state_population_file") == state_population
assert result.output_for_node("total_population") == 9294108
Using output_for_node()
we access the individual assets and ensure that their output still matches what we expect.
Logging
Since we are using Dagster to execute the assets, when we run materialize()
, the default Dagster event execution will be logged. If you rerun pytest
with the -s
flag set to view the logs, all of the Dagster logs will be included as well as the test output:
> pytest dagster_testing_tests/test_lesson_3.py::test_assets -s
...
dagster_testing_tests/test_lesson_3.py 2025-02-14 09:30:11 -0600 - dagster - DEBUG - __ephemeral_asset_job__ - 7924f6b8-72c6-4789-b34a-d25b144f7f66 - 62349 - RUN_START - Started execution of run for "__ephemeral_asset_job__".
2025-02-14 09:30:11 -0600 - dagster - DEBUG - __ephemeral_asset_job__ - 7924f6b8-72c6-4789-b34a-d25b144f7f66 - 62349 - ENGINE_EVENT - Executing steps in process (pid: 62349)
2025-02-14 09:30:11 -0600 - dagster - DEBUG - __ephemeral_asset_job__ - 7924f6b8-72c6-4789-b34a-d25b144f7f66 - 62349 - RESOURCE_INIT_STARTED - Starting initialization of resources [io_manager].
...
Including the logging can be useful when debugging a broken test and can help you isolate where you are encountering an error.