August 2, 2024

How Do You Call Snowflake Stored Procedures Using dbt Hooks?

By Rajib Prasad

Efficient data transformation and processing are crucial for data analytics and generating insights. Snowflake AI Data Cloud is one of the most powerful platforms, including storage services supporting complex data. Integrating Snowflake with dbt adds another layer of automation and control to the data pipeline. 

In this blog, we’ll explore:

  • Overview of Snowflake Stored Procedures & dbt Hooks.

  • Importance of Stored Procedures & dbt Hooks.

  • How is the on-run-start hook used in a dbt project to call a stored procedure?

  • Standards and best practices for effectively integrating stored procedures into dbt projects.

 Let’s jump in!

What are Snowflake Stored Procedures & dbt Hooks?

Snowflake stored procedures are programmable routines that allow users to encapsulate and execute complex logic directly in a Snowflake database. These procedures are designed to automate repetitive tasks, implement business logic, and perform complex data transformations, increasing the productivity and efficiency of data processing workflows.

Snowflake stored procedures support multiple programming languages (JavaScript, Python, and SQL) to meet different development needs and preferences.

Hooks help integrate dbt with external systems or processes, increasing the automation and scalability of data transformation workflows. There are primarily four hooks in dbt: pre-hook, post-hook, on-run-start & on-run-end.

Why Does it Matter?

Snowflake stored procedures and dbt Hooks are essential to modern data engineering and analytics workflows. 

  • Data professionals can improve their ability to build robust, scalable, and automated data pipelines by learning to use Snowflake stored procedures with dbt Hooks.

  • The Snowflake stored procedure optimizes data processing to improve and encourage code reuse, increasing productivity. 

  • Integrating Snowflake stored procedures with dbt Hooks automates complex data workflows and improves pipeline orchestration.

How to Call Stored Procedures Using Hooks?

Imagine a scenario where we have an External Stage set up in our database that houses various Excel files. Our task is to transfer the data from the Excel files to the Snowflake tables. And once the data is imported into our Snowflake tables, we need to proceed with further processing and refining the data using dbt models.

Only files with supported formats can be loaded directly from the External Stage to our Snowflake tables using a copy statement. The Excel .xlsx format is unsupported, so we need a different approach to loading the file into a table. We can achieve this by using the Snowflake Stored Procedure, which will ensure that the contents of the Excel files are loaded into the appropriate Snowflake tables. 

To automate and streamline this process, we can call this Stored Procedure using an on-run-start hook in dbt. This method ensures that fresh data from an external environment is seamlessly integrated into our Snowflake environment. In this way, we maintain an efficient, automated workflow for data retrieval and processing, ultimately increasing the reliability and performance of our data pipeline.

Before diving into the steps, let’s understand the database object structure:

  1. Target Table: The table where the data from the Excel file will be loaded.

  2. External Stage: The stage where the data is stored during the loading process.

  3. Stored Procedure: The procedure used to load the data from the Excel file into the target table.

Following steps 1 to 5, we will manually create a table and a stored procedure to load data from an Excel file into our Snowflake database, after testing and confirming that the data loads correctly, we will proceed to incorporate this logic into a dbt project for more efficient and automated data management.

Before we begin, ensure the following:

  • An external stage is set up beforehand.

  • You have access to Snowsight to execute SQL commands.

  • You have a dbt project configured and ready for integration.

To load the External Stage files into the Tables and call the Stored Procedure from DBT, we need to follow the below steps:

Step 1: Select the warehouse, database, and schema you are going to use –

				
					USE WAREHOUSE <warehouse_name>;
USE DATABASE <database_name>;
USE SCHEMA <schema_name>;
				
			

Note: This step is specifically for the table in a schema different from the procedure.

Step 2: Create a table based on your Excel file’s column names –

				
					CREATE OR REPLACE TABLE <table_name>(
		Column_name data type,
		...
		);
				
			

Note: Keep all column data types as VARCHAR during the initial loading. We can change the data types in the subsequent layers as needed.

Step 3: Create Snowflake Stored Procedure by using the below query –

				
					CREATE OR REPLACE PROCEDURE <database_name>.<schema_name>.<stored_procedure_name>()
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','openpyxl','xlrd>=2.0.1','python-dateutil')
HANDLER = 'main'
EXECUTE AS CALLER
AS '
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
import snowflake.connector
from snowflake.snowpark import session
from dateutil import parser

def main(session):
    scope_query = "SELECT BUILD_SCOPED_FILE_URL(@<EXTERNAL_STAGE_NAME>, ''<excel_file_name>'') AS file_url"
    df = pd.read_sql_query(scope_query, session.connection)
    with SnowflakeFile.open(df[''FILE_URL''][0], ''rb'') as f:
        df = pd.read_excel(f)
        
        # Drop the first row if it contains incorrect types
        df = df.reset_index(drop=True)
        
        # Set correct data types
        df["<column_name>"] = df["<column_name>"].astype(str)
        …..
.….

        # Rename columns to uppercase
        df = df.rename(columns={col: col.upper() for col in df.columns})
        
        # Write DataFrame to AIRBNB_UNITS table
        session.write_pandas(df, table_name="<table_name>", database="<database_name>", schema="<schema_name>")

        return "Successfully loaded into <table_name>"
' ;

				
			

In the above-stored Procedure, the RETURNS VARCHAR(16777216) clause specifies that the maximum return string length is 16,777,216 characters. The LANGUAGE PYTHON clause indicates that the procedure is written in Python, and RUNTIME_VERSION = '3.8' specifies Python 3.8. The PACKAGES clause lists the required Python packages, including snowflake-snowpark-python, openpyxl, xlrd>=2.0.1, and python-dateutil. The HANDLER = 'main' clause sets main as the entry point, and EXECUTE AS CALLER means the procedure runs with the caller’s permissions.

Step 4: Execute the stored procedure and verify its functionality using the following SQL statement:

				
					call <database_name>.<schema_name>.<stored_procedure_name>();
				
			

Step 5: Now you may get a return message “Successfully loaded into <table_name>”

Additionally, you can run a select query to verify the table data:

If the procedure executes successfully without errors, create your dbt macros and hooks to automate the process further. This step ensures that the stored procedure integrates seamlessly into your data transformation workflows managed by dbt.

Step 6: Open your dbt project –

Prerequisite: Ensure you have a dbt environment set up that allows you to run your models and access your Snowflake database.

Step 7: Create a macro called – call_stored_procedure.sql and copy the below contents accordingly –

				
					-- macros/call_stored_procedure.sql

{% macro call_stored_procedure() %}
    {% set query %}
CALL <database_name>.<schema_name>.<stored_procedure_name>();
    {% endset %}

    {% set result = run_query(query) %}
{% endmacro %}

				
			

Step 8: Open your dbt_project.yml file and add the following –

				
					...

clean_targets:      # directories to be removed by `dbt clean`
 - "target"
 - "dbt_packages"

on_run_start:
 - "{{ call_stored_procedure() }}"

models:
  blog:
    cleaned:
      +database: airbnb
...
				
			

In this context, the macro is executed at the start of the dbt run as part of the on-run-start hook(pre-hook).

Note: It’s also possible to run a pre-hook at the model level.
Let’s say we have a cleaned table (airbnb_units_cleaned), which runs once the data is loaded to the raw table. In that case, we can call the stored procedure from the cleaned model yml file. 

In the image below, the stored procedure will be called before the model airbnb_units_cleaned is executed.

Step 9: Run the below dbt command to test your dbt hook –

Step 10: Go to the Snowflake Query History section to see the executed queries and select the DBT user from the User filter dropdown.

Logs I can see, after running our dbt hook:

Best Practices

When integrating Snowflake stored procedures with dbt hooks, following best practices can ensure a smooth and efficient workflow. Here are some expert tips to help you optimize this process:

  1. Use Environment Variables – When working across various environments such as development (dev), testing (test), and production (prod), and when your Snowflake tables, stored procedures, or external stages incorporate environment-specific variables in their naming conventions, leverage dbt to pass these environment variables dynamically. This approach enhances flexibility and adaptability within your data pipelines, ensuring seamless management across different deployment environments.
dbt macro
				
					-- macros/call_stored_procedure.sql

{% macro call_stored_procedure(param) %}
    {% set query %}
CALL <database_name>.<schema_name>.<stored_procedure_name>( {{param}} );
    {% endset %}

    {% set result = run_query(query) %}
{% endmacro %}
				
			
dbt hook
				
					on-run-start:
  - "{{ call_stored_procedure(env_var('DBT_ENV')) }}"

				
			
The newly added hook in the dbt_project.yml file
  1. Create Stored Procedure from dbt – This approach is particularly useful when your database, schema, or stored procedure contains environment-specific names (e.g., dev, tst, prd).

    Instead of manually creating the stored procedure through the Snowflake UI or any other test or production environment processes, you can achieve this directly from dbt. In our case, the schema name contains the environment as a prefix like <env>_raw or <env>_cleaned.

dbt macro to create stored procedure
				
					
{% macro create_stored_procedure(env_param) %}
CREATE PROCEDURE IF NOT EXISTS <database_name>.{{env_param}}_RAW.<stored_procudure_name>(env varchar(4))
RETURNS VARCHAR(16777216)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python','openpyxl','xlrd>=2.0.1','python-dateutil')
HANDLER = 'main'
EXECUTE AS CALLER
AS '
import pandas as pd
from snowflake.snowpark.files import SnowflakeFile
import snowflake.connector
from snowflake.snowpark import session
from dateutil import parser

def main(session, env):
    scope_query = f"SELECT BUILD_SCOPED_FILE_URL(@<database_name>.{env}_RAW.<external_stage_name>, ''<EXCEL_FILE_NAME>.xlsx'') AS file_url"
    df = pd.read_sql_query(scope_query, session.connection)
    with SnowflakeFile.open(df[''FILE_URL''][0], ''rb'') as f:
		…..
		…..
	# Write DataFrame to AIRBNB_UNITS table
session.write_pandas(df, table_name="<table_name>", database="<database_name>", schema=f"{env}_RAW")

return "Successfully loaded into AIRBNB_UNITS"
';
{% endmacro %}

				
			

Note: In the create procedure definition {{env_param}}_RAW is your schema name  and in python function definition {env}_RAW is your schema name.

dbt hook to call this macro
				
					- "{{ create_stored_procedure(env_var('DBT_ENV')) }}"

				
			
  1. Documentation and Comments – Clear documentation and comments improve code maintenance and help future developers understand it.

    You can add a comment or a log message to understand the code’s purpose better, and we can log the execution steps while running the macro.

				
					-- macros/call_stored_procedure.sql

{% macro call_stored_procedure() %}
    {# Set the stored procedure call query #}
    {% set query %}
        CALL <database_name>.<schema_name>.<stored_procedure_name> ;
    {% endset %}

    {# Execute the stored procedure #}
    {% set result = run_query(query) %}
    {{ log("Stored procedure executed successfully.", info=True) }}
{% endmacro %}
				
			

Closing

This blog covered how to call a stored procedure using a dbt hook. This approach can be particularly effective when you need to incorporate complex logic into your data pipeline that doesn’t fit neatly within the traditional dbt model of transformations and models. Using stored procedures within dbt hooks allows you to leverage the full power of SQL and procedural code to perform operations that might be inefficient in pure SQL.

Snowflake stored procedures are similar to data processing recipes in various languages(SQL, Python & JavaScript). They use the same steps for different tasks to save time and effort. You can automate complicated tasks with these procedures.

At the beginning or end of a data process, dbt hooks are used to run specific commands. This is useful for setting up the environment and ensuring everything runs smoothly. You can automate the process and make it more efficient using these hooks. They help with running tasks and managing dependencies

For any additional assistance or to get expert help with your data projects, feel free to contact the phData team. Our experts are ready to support you with your Snowflake and dbt needs.

FAQs

Data management and automation have several benefits. They promote code reusability and reduce the amount of redundant code. Stored procedures allow for atomic operations to ensure data integrity. They facilitate better performance by reducing network round-trips. Stored procedures bolster data governance practices by controlling access to sensitive data and operations through role-based permission.

Automating and enhancing data transformation workflows is a crucial role of the dbt Hooks. Users can execute tasks before or after the main operation using hooks such as on-run-start and on-run-end. The ability allows seamless integration of external processes, such as loading data from external sources. Automating repetitive tasks, ensuring consistent data preparation, and enabling advanced data processing logic are some of the benefits of the dbt Hooks. They give flexibility in managing dependencies and orchestrating complex data pipelines, which can be used to accelerate the delivery of analytic-ready data for downstream applications.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit