August 6, 2024

How to Build Effective Data Pipelines in Snowpark

By Loc Dao

As today’s world keeps progressing towards data-driven decisions, organizations must have quality data created from efficient and effective data pipelines. Coupled with the increasing demand for quick decision-making, timely processing and extraction of insights are also key. 

Organizations must ensure their data pipelines are well designed and implemented to achieve this, especially as their engagement with cloud data platforms such as the Snowflake Data Cloud grows. For customers in Snowflake, Snowpark is a powerful tool for building these effective and scalable data pipelines. 

In this blog post, we’ll dive deep into Snowpark, explore key concepts, and walkthrough example code along with best practices.

What is Snowpark and Why Use It for Building Data Pipelines?

Snowpark consists of libraries and runtimes that enable data engineers, data scientists, and other developers to code in Python, Java, Scala, or any other language to execute data engineering pipelines, machine learning workflows, or other data applications securely and seamlessly into the Snowflake Data Cloud. 

Runtimes provided on the server side consist of using Snowflake’s virtual warehouses (Python, Java, Scala) or Snowpark Container Services (any language), which allow developers to register, deploy, and run container images in Snowflake-managed infrastructure. 

Snowpark offers two client-side libraries: Snowpark API and Snowpark ML API. The Snowpark API includes the DataFrame API and integrates with other popular open-source APIs developers can use to complete data engineering tasks. The Snowpark ML API builds end-to-end machine learning workflows from preprocessing, feature engineering, training, and model deployment. 

While this blog focuses on the data engineering capabilities of Snowpark, check out the following blogs for diving deep into Snowpark ML: How to Do Document Classification on Snowflake and How to Train ML Models Using Snowpark for Python.

The key advantages of using Snowpark to Build Data Pipelines include:

  • Simplified Architecture: Snowpark provides a consistent API across supported languages and allows different users, such as data engineers, data scientists, etc., to collaborate within a single platform.

  • Flexibility and Performance Optimization: By executing code within Snowflake, Snowpark minimizes data movement and brings flexibility and scalability to process complex workloads with large amounts of data in sufficient time.

  • Enhanced Security and Governance: Snowpark runs within Snowflake’s secure environment, which includes encryption at rest and in transit, RBAC, and other features such as data masking or private connectivity to cloud providers.

  • Reduced Operational Overhead: As Snowpark runs on services managed by Snowflake, resources used for infrastructure can be shifted to building pipelines and other areas for innovation.

Getting Started with Snowpark

Snowflake provides many different options for developing with Snowpark. The quickest way to get started if you prefer Python is through a Python Worksheet in Snowsight. Snowsight is an interactive web-based UI that allows you to code and collaborate with others using SQL or Python worksheets. For a thorough example and walkthrough, try going through this guide for using Python Worksheets to Develop a Snowpark Stored Procedure. Also, Snowflake Notebooks, currently in Private Preview, offers an interactive cell-based programming interface where developers can experiment with data from Snowpark.

For those who prefer local integrated development environments (IDE) such as Visual Studio Code, PyCharm, and more, Snowflake provides connectors and extensions that allow users to write and execute queries, debug stored procedures, and manage Snowflake resources and data all from the IDE. 

For a step-by-step guide and video, you can follow along to set up the Snowflake connection in VS Code; check out this blog. The example in the blog provides steps on setting up the connection for Scala, but if you prefer Python, try the following code below to connect to your Snowflake account and test listing out tables in a specified schema:

Installing Snowpark library for Python in the local environment

				
					pip install "snowflake-snowpark-python[pandas]" 
				
			
				
					from snowflake.snowpark import Session

# Define your connection parameters
connection_parameters = {
    "account": "<your_account>.snowflakecomputing.com",
    "user": "<your_username>",
    "password": "<your_password>",
    "role": "<your_role>",
    "warehouse": "<your_warehouse>",
    "database": "<your_database>",
    "schema": "<your_schema>"
}

# Create a Snowpark session
session = Session.builder.configs(connection_parameters).create()

# Test the connection by listing the tables
tables = session.sql("SHOW TABLES").collect()

# Print table list
for table in tables:
    print(table)

# Close the session
session.close()

				
			

In the Python code above, the <variable_name> must be replaced with your actual Snowflake account details. Also, the connection parameters listed in the sample are not all required to make a successful connection to Snowflake, and there are additional authentication methods other than username and password. Please refer to the Snowflake documentation for connecting to Snowflake with Python for more information.

Optimizing Performance in Snowpark Data Pipelines

Data pipelines should be optimized to meet any time-sensitive processing tasks or analytical requirements, reduce costs by minimizing compute resource usage, and ensure scalability as data can grow in volume and complexity. Some examples and strategies for optimizing Snowpark code are below:

Use DataFrame Operations Effectively

DataFrame operations are executed lazily in Snowpark, so chaining them together allows the query engine to evaluate the transformations and optimize performance. This minimizes the number of actions that move data. The following Snowpark code with Python provides a simple example of this:

				
					# Create a dataframe from a Snowflake table named "sales_data"
sales_df = session.table("sales_data")

# Chain multiple dataframe operations to filter and aggregate data
result_df = (sales_df
             .filter(col("state") == "TX")
             .groupBy(col("product"))
             .agg(sum(col("amount")).alias("total_sales"))
             .orderBy(col("total_sales").desc()))

# Action below submits optimized query with lazily evaluated code above 
result_df.show()

				
			

Data shuffling is a common concept in distributed data processing and can become a performance bottleneck if not carefully considered. GroupBy() and orderBy() trigger the data movement across the compute nodes in the Snowflake Virtual Warehouse. Excessive shuffling can greatly impact your query performance, especially when getting to larger datasets. Some ways developers can minimize this impact include carefully placing shuffle operations, selecting only required columns during loads and transformations, and filtering datasets early.

Leverage Snowflake Caching

Suppose you have DataFrames that undergo repeated transformations or are constructed from computationally expensive operations. In that case, you can use caching to avoid having to recompute the results downstream in your Snowpark job. Although caching can be very useful, use it cautiously, as overuse can lead to performance issues. In the sample code below, the cached_categorized_sales_df is only computed once and then used downstream by three other calculations to compute the total, average, and count of sales.

				
					# Create a dataframe from a Snowflake table named "sales_data"
sales_df = session.table("sales_data")

# Create a new category column which groups sales into a Low, Medium, or High bucket
categorized_sales_df = sales_df.withColumn("sales_category", when(col("sales_amount") < 1000, "Low")                                 .when((col("sales_amount") >= 5000) & (col("sales_amount") < 10000), "Medium")                                          .otherwise("High"))

# Cache the categorized sales dataframe
cached_categorized_sales_df = categorized_sales_df.cache_result()

# Downstream transformations on cached dataframe
# 1: Total sales by sales category
total_sales_by_category = cached_categorized_sales_df.groupBy("sales_category").agg(sum(col("sales_amount")).alias("total_sales")).orderBy(col("total_sales").desc())
total_sales_by_category.show()

# 2: Average sales amount for each category
avg_sales_by_category = cached_categorized_sales_df.groupBy("sales_category").agg(avg(col("sales_amount")).alias("avg_sales")).orderBy(col("avg_sales").desc())
avg_sales_by_category.show()

# 3: Transaction counts per category
count_sales_by_category = cached_categorized_sales_df.groupBy("sales_category").count().orderBy(col("count").desc())
count_sales_by_category.show()

				
			

Optimal File Sizes for Loading/Unloading

This typically only matters when using Snowpark to read from or write to stages. To optimize performance, it is recommended to compress file sizes between 100 and 250 MB. This enables Snowflake to parallelize the load process, which minimizes time and resource consumption.

Use Snowpark-Optimized Warehouses

Snowflake launched Snowpark-Optimized Warehouses in 2023 for all Snowflake regions across AWS, Azure, and GCP. This new warehouse type enables customers to efficiently run complex data engineering or data science workloads with larger memory requirements.

Utilize User-Defined Functions (UDFs)

User-defined functions (UDFs) can also improve the performance of your Snowpark data pipelines. Snowpark offers the capability of using vectorized UDFs, where computations can be done on entire arrays of data instead of row by row. Furthermore, Snowpark executes the UDFs on the server where the data resides, resulting in reduced data movement and processing time. UDFs also allow developers to implement custom logic or leverage external libraries. 

There are two ways to create the UDFs: anonymous UDF, which assigns the function to a variable, and named UDF, which can be called by name or even used in a different session. The Python code below provides an example of a named UDF: 

				
					# Create a named UDF which contains logic to determine high-value customers
@udf(name="is_high_value_customer", is_permanent=True, stage_location="@udf_stage")
def is_high_value_customer(customer_id: str, order_count: int, avg_order_value: float) -> bool:
  min_orders = 10
  min_avg_order = 100.0
  return (order_count >= min_orders) and (avg_order_value >= min_avg_order)

# Identify high-value customers using the UDF on a dataframe containing customer orders
high_value_customers_df = customer_orders_df.withColumn(
    "is_high_value", is_high_value_customer(
col("customer_id"), col("order_count"), col("avg_order_value")
    )
)

				
			

In this example, a permanent named UDF called is_high_value_customer returns either a True or False boolean value after taking in the count of orders and the average order value of a customer and comparing with defined thresholds of 10 orders and $100 average order value. Note that the stage_location is specified and indicates that the Python file for the UDF is uploaded to a stage called udf_stage.

Testing and Deployment

To adhere to software engineering best practices and ensure reliability, thorough testing should be done for your Snowpark data pipelines. Tests can be developed and added as steps in your CI/CD pipeline to ensure that only clean and functional code is deployed to production. 

Snowpark supports several popular testing frameworks and tools, such as Pytest and Great Expectations. See the example below, which shows Pytest used to create a unit test for a function that defines the high-value status of a customer:

				
					import pytest
from module_containing_udfs import is_high_value_customer

# Define test cases
@pytest.mark.parametrize("order_count, avg_order_value, expected", [
    (5, 150.0, False),  # Not enough orders
    (15, 50.0, False),  # Average order value too low
    (15, 150.0, True),  # Meets both criteria
    (10, 100.0, True),  # Equal to threshold values
])
def test_is_high_value_customer(order_count, avg_order_value, expected):
    assert is_high_value_customer(order_count, avg_order_value) == expected

				
			

Conclusion

Building effective data pipelines is key for organizations to process and extract insights from their data in a scalable, secure, and cost-effective manner. Snowpark provides flexibility and high performance, like Snowflake’s data processing engine. These capabilities can be quite effective, but you may face challenges such as processing unstructured data, troubleshooting and optimizing slow Snowpark code, or configuring Snowpark Container Services. 

For expert guidance on deploying your data pipelines with best practices, consult with phData, Snowflake’s Partner of the Year for four consecutive years. Our experience deploying modern data solutions across diverse industries brings valuable insights and support to help you optimize your data platform.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit