docs/integrations/prefect-snowflake/index.mdx
The prefect-snowflake integration makes it easy to connect to Snowflake in your Prefect flows. You can run queries both synchronously and asynchronously as Prefect flows and tasks.
Install prefect-snowflake as a dependency of Prefect.
If you don't already have Prefect installed, it will install the newest version of prefect as well.
pip install "prefect[snowflake]"
Upgrade to the latest versions of prefect and prefect-snowflake:
pip install -U "prefect[snowflake]"
The prefect-snowflake integration has two blocks: one for storing credentials and one for storing connection information. Register blocks in this module to view and edit them on Prefect Cloud:
prefect block register -m prefect_snowflake
Below is a walkthrough on saving a SnowflakeCredentials block through code. Log into your Snowflake account to find your credentials.
The example below uses a user and password combination, but refer to the SDK documentation for a full list of authentication and connection options.
from prefect_snowflake import SnowflakeCredentials
credentials = SnowflakeCredentials(
account="ACCOUNT-PLACEHOLDER", # resembles nh12345.us-east-2.snowflake
user="USER-PLACEHOLDER",
password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
Then, to create a SnowflakeConnector block:
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector
credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
connector = SnowflakeConnector(
credentials=credentials,
database="DATABASE-PLACEHOLDER",
schema="SCHEMA-PLACEHOLDER",
warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
You can now easily load the saved block, which holds your credentials and connection info:
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector
SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
To set up a table, use the execute and execute_many methods. Then, use the fetch_all method. If the results are too large to fit into memory, use the fetch_many method to retrieve data in chunks.
By using the SnowflakeConnector as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
@task
def setup_table(block_name: str) -> None:
with SnowflakeConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Space"},
{"name": "Me", "address": "Myway 88"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SnowflakeConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def snowflake_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__=="__main__":
snowflake_flow()
If the native methods of the block don't meet your requirements, don't worry. You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.
import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas
@flow
def snowflake_write_pandas_flow():
connector = SnowflakeConnector.load("my-block")
with connector.get_connection() as connection:
table_name = "TABLE_NAME"
ddl = "NAME STRING, NUMBER INT"
statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
with connection.cursor() as cursor:
cursor.execute(statement)
# case sensitivity matters here!
df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
success, num_chunks, num_rows, _ = write_pandas(
conn=connection,
df=df,
table_name=table_name,
database=snowflake_connector.database,
schema=snowflake_connector.schema_ # note the "_" suffix
)
Refer to the prefect-snowflake SDK documentation to explore other capabilities of the prefect-snowflake library, such as async methods.
For further assistance using Snowflake, consult the Snowflake documentation or the Snowflake Python Connector documentation.