Integrations resources
If we think about the state_population_database
asset, we want to make sure it can execute a query and return the results, but we are not necessarily concerned with which database is used to execute the query. Instead of insisting that the database
resource must be Snowflake, we can change the annotation so that it can take in any resource:
@dg.asset
def state_population_database(database: dg.ConfigurableResource) -> list[tuple]:
query = """
SELECT
city_name,
population
FROM data.city_population
WHERE state_name = 'NY';
"""
with database.get_connection() as conn:
cur = conn.cursor()
cur.execute(query)
return cur.fetchall()
Now in order to execute the asset, we only need to provide a resource that has a get_connection
context manager with methods for cursor
and execute
. From Dagster's perspective, this can be anything. It does not even need to be a database. As long as it meets those criteria.
For our purposes we will create a resource that provides similar functionality but uses a different database. We will use Postgres which, unlike Snowflake, is open sourced and can be used for free and run on our local machine.
Another benefit of using Postgres is that its Python client is a lot like the Snowflake Python client. Here is a resource to allow us to execute queries in Postgres. Click View answer to view it.
class PostgresResource(dg.ConfigurableResource):
user: str
password: str
host: str
database: str
def _connection(self):
return psycopg2.connect(
user=self.user,
password=self.password,
host=self.host,
database=self.database,
)
@contextmanager
def get_connection(self):
yield self._connection()
This resource provides everything we need and can be used in place of the Snowflake resource for the purposes of integration testing to ensure that our asset works while still connecting to a database.
💡 SQL Syntax: One thing to be aware of when using a different database backend for integration testing is any differences in SQL syntax. Snowflake and Postgres overlap on most standard SQL but Snowflake does include some syntax and functions not supported by Postgres.
Similar resources
We defined our own Postgres resource to take the place of Snowflake but there are other options. One popular tool is DuckDB which serves as an OLAP database that can run locally. This can serve as a great substitute for a data warehouse like Snowflake. Just remember that there are some subtle differences between the Snowflake and DuckDB Python clients. For example, something like.
@dg.asset
def truncate_table(database: dg.ConfigurableResource) -> None:
query = "TRUNCATE TABLE data.city_population"
with database.get_connection() as conn:
conn.execute(query)
Would not work with Snowflake because Snowflake requires the cursor
while DuckDB does not. The code would need to match for both resources:
@dg.asset
def truncate_table(database: dg.ConfigurableResource) -> None:
query = "TRUNCATE TABLE data.city_population"
with database.get_connection() as conn:
cur = conn.cursor() # Cursor is required
cur.execute(query)
The DuckDB resource may be a better fit for your use case but we will use the custom Postgres resource for the purposes of demonstration.