This blog was originally written by Travis Hegner and updated for 2024 by Vinicius Olivera.
Snowpark ML is transforming the way that organizations implement AI solutions. Snowpark allows ML models and code to run on Snowflake warehouses. By “bringing the code to the data,” we’ve seen ML applications run anywhere from 4-100x faster than other architectures.
Vector embeddings are a popular technique for working with unstructured data for Generative AI use cases. We won’t go fully down the road of Large Language Models in this post, but we will show:
- How documents can be embedded as vectors
- How to visualize those vectors
- How to apply ML classification using those embedding vectors as features for documents
Join us on this technical walkthrough as we determine the practicality of the Snowflake Data Cloud and Snowpark and Snowflake ML for machine learning use-cases.
Document Vectors
With the success of word embeddings, it’s understood that entire documents can be represented in a similar way. In this case study, we will build a vector that represents a document that is derived from an IDF weighted average of the word embeddings that make up the document.
This form of unsupervised machine learning should put documents that use many similar words near each other in the resulting vector space, which should allow us to try some interesting classification and visualization tasks.
Preparing the Data
We will use a BBC news articles dataset found on Kaggle. This dataset consists of 2,225 news articles in five different categories: business, entertainment, politics, sports, and technology.
We will use several combinations of tools to create a vector that attempts to represent each article in euclidean space. We will create more Snowflake tables to act as parts of a feature store, explore those features for value, and try our hand at a couple of different models to classify a hold-out set.
The companion notebook has scripts to support the loading of this data on Snowflake and was used as the entry point to the Snowflake instance.
The class column is either “test” or “train,” and represents to which partition the article belongs. The ID column is the unique ID of the article within a category, and the category column is the category to which the article belongs. Take note that only the category/ID combination is unique in this dataset, not the ID alone.
Snowflake Connection
First of all, in order to interact with Snowflake remotely, we’re going to need to:
- Install Snowflake and Snowpark connector packages.
- Provide Snowpark Library clients with pertinent information for authentication.
Anaconda
Snowflake’s Anaconda channel is the primary mechanism for installing packages in Snowflake. Installing Anaconda locally is a good way to match your local environment to Snowflake’s when using Snowpark Python’s API. Even though we’re working remotely, we must have dependency parity between local and remote installation.
Here is a guide to help navigate the installation process using Anaconda.
Connections Setup
For this article, we’re going to use and explore two connection methods with Snowflake: The Session object and the connection object.
import os
from dotenv import load_dotenv
from pathlib import Path
dotenv_path = Path('vars.env')
load_dotenv(dotenv_path=dotenv_path)
USER = os.getenv("SF_USER")
PASSWORD = os.getenv("SF_PASSWORD")
ACCOUNT = os.getenv("SF_ACCOUNT")
WAREHOUSE = os.getenv("SF_WAREHOUSE")
DATABASE = os.getenv("SF_DATABASE")
SCHEMA = os.getenv("SF_DATABASE_SCHEMA")
connection object:
import snowflake.connector
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)
session object:
import snowflake.connector
from snowflake.snowpark import Session
connection_parameters = {
"account": ACCOUNT,
"user": USER,
"password": PASSWORD,
"warehouse": WAREHOUSE,
"database": DATABASE,
"schema": SCHEMA
}
session = Session.builder.configs(connection_parameters).create()
Training the Word Vectors
Extracting Tokens
First, we create a UDF that takes in a string and breaks it into individual words, excluding punctuation. A UDF is a function that operates on a single row in a Snowflake table to produce a prediction. Snowpark DataFrames, on the other hand, allows us to apply operations (including UDFs) across all of the rows in a table.
## Exploring with the anonimous interface
# Define the UDF (User-Defined Function)
extract_words = udf(lambda article:
[
x.strip() for x in re.compile(r'\b\w+\b',
flags=re.IGNORECASE).findall(article.lower())
],
return_type=ArrayType(element_type=str),
input_types=[StringType()],
name="my_udf",
replace=True,
session=session
)
This UDF can be applied to a text column in Snowflake, and it returns an array of strings that match any sequence of word characters as defined in Scala’s regular expression engine. We will apply this UDF to each article and write the resultant tokens to a local file to feed a gensim Word2Vec model in Python later.
df = session.table("BBC_ARTICLES").filter(col("CLASS") == "train")
words = df.select(extract_words(col("WORDS")).alias("ARR")).collect()
del df
contents = []
for word in words:
contents.append(
word[0].replace(
"[\n ", ""
).replace(
"\n ", ""
).replace(
'"', ""
).replace(
"\n]", ""
)
)
## Writing the words.txt file
file = open('words.txt','w')
for content in contents:
file.write(content+"\n")
file.close()
This code writes a text file where each line represents the words of each article in order separated by commas.
Training Embeddings
With our text file full of words, we can train a simple gensim Word2Vec model.
import os
import logging
from gensim.utils import tokenize
from gensim.models import Word2Vec
logging.basicConfig(
format='%(asctime)s : %(levelname)s : %(message)s',
level=logging.INFO
)
with open("words.txt") as file:
lines = file.readlines()
docs = []
for line in lines:
docs.append(line.strip().split(","))
model = Word2Vec(sentences=docs, vector_size=300, workers=4, epochs=1000)
model.save('w2v.model')
Uploading Word Vectors
To use our word vectors in Snowflake, we have to create a table containing each word and its associated vector. This will allow us to join against this table and work with the vectors. Unfortunately, snowflake.connector.pandas_tools does not seem to natively support correctly uploading a dataframe where a column has an embedded array. Our code will upload the vector formatted as a JSON array string.
session.sql('CREATE OR REPLACE TABLE bbc_wordvecs (word varchar, vecstr varchar, vector array)').collect()
With our table created, let’s upload our words and vector strings.
import os
import sys
import pandas as pd
from gensim.models import Word2Vec
import snowflake.connector as sc
import snowflake.connector.pandas_tools as pt
model = Word2Vec.load("w2v.model")
words = []
for key, val in [(key, model.wv[key]) for key in model.wv.index_to_key]:
words.append({
"WORD": key,
"VECSTR": '['+','.join([str(v) for v in list(val)])+']',
})
df = pd.DataFrame(words)
success, num_chunks, num_rows, output = pt.write_pandas(conn=conn, df=df, table_name='BBC_WORDVECS')
if not success:
print(f"Error writing data to snowflake table BBC_WORDVECS")
print(output)
sys.exit(1)
print("Successfully wrote data to snowflake:")
print(f"Num Chunks: {num_chunks}")
print(f"num_rows: {num_rows}")
print(output)
A second step is required to parse that vector string and store our vector as an array of floats.
session.sql('UPDATE BBC_WORDVECS SET VECTOR=parse_json(VECSTR)').collect()
This results in a table that contains a vector for every word in our corpus. A vector is stored as a simple Array of floating point numbers.
Now that our word vectors are ready, we can get to work creating our document vectors.
Generating Document Vectors
Scaling and Averaging World Vectors
When using the Python API, one must know that your UDFs must be registered before you reference a dataset. Due to that, we will assemble and register the necessary UDFs and then reference the needed datasets as proper snowpark.DataFrames.
Now that we have a flattened Data Frame of words in each article, a Data Frame of each word and its vectors, and a Data Frame of each word and its IDF score, we can join these elements together and do some scaling and aggregation.
First, we’ll need to create a couple more UDFs. This is where we start to run into some of the limitations in the current version of Snowpark. For some reason, passing an “Array[Float]” or “Array[Double]” is not supported currently, so we are forced to pass our floating point numbers as “Array[String].” At least when we do this, Snowflake implicitly coerces the floats to string when we call the UDF during the Dataframe transformation.
import numpy as np
session.add_packages("numpy")
@udf(name="scale_vector", input_types=[DoubleType(), ArrayType()], return_type=StringType(), replace=True, session=session)
def scale_vectors(idf, vector):
scaled_vector = np.multiply(vector, idf).tolist()
return str(scaled_vector)
As you can see, we have to do some ugly workarounds to convert our vector elements to actual “Double” values and then pass the resulting vector back to Snowflake as a JSON array string. This is preferred over passing back “Array[String]” because we are now able to easily execute a “parse_json()” on the returned value, and Snowflake will treat our new vector as an “Array” of “Float” internally. This helps prevent Snowflake from hitting some serialization limits during Dataframe transformations.
Another wrench in our plan is that we don’t have a way to create user-defined aggregate functions.
User-defined functions are great for operating on one record at a time, but we don’t have a way to operate on a grouped set of records. To work around this, we do a “.groupBy().agg(array_agg())” on our Data Frame, which collects values into an Array column per record. This works as an alternative, except in our case, we need to do aggregations on vectors that we are storing as arrays.
This means we now must pass an “Array[Array[Float]]” or “Array[Array[Double]]” or even “Array[Array[String]],” but as you may have guessed — these are all unsupported types for UDFs. Unfortunately, we have to do even uglier string-based conversions. Similar to above, Snowflake automatically coerces the Array[Array[Double]] into an Array[String] for us to operate on.
@udf(name="average_vectors", input_types=[ArrayType()], return_type=StringType(), replace=True, session=session)
def average_vectors(vecs):
l = len(vecs[0])
res = np.zeros(l, dtype=float)
for v in vecs:
a = list(map(float, v))
res = res + a
res = np.multiply(res, 1.0/len(vecs)).tolist()
return str(res)
As you can see in this UDF, we have to do some less-than-ideal string manipulation to calculate our desired result. On the plus side, our workarounds work well enough, and we can compute our document vectors and store them back into a new Snowflake table. Let’s create a table to hold our document vectors.
Setup Variables
Now that we are done with the proper UDF setting; we need to set up the Data Frames we want to work with, as well as some other variables we’ll need.
articles = session.table("BBC_ARTICLES")
vectors = session.table("BBC_WORDVECS").select(col("WORD"), col("VECTOR"))
corpus = articles.filter(col("CLASS") == "train").count()
Extracting Words
Next, we break the articles into separate words or tokens, flatten by the word array, and select only the needed columns.
exp = articles.with_column("ARR", extract_words(col("WORDS"))).flatten(col("ARR"))
words = exp.select(
exp["CLASS"],
exp["CATEGORY"],
exp["ID"],
exp["INDEX"],
exp["VALUE"].alias("WORD")
)
Calculating IDF Scores
The well-known TF*IDF score is a way to measure a word’s importance across a corpus of documents. We will calculate the IDF of every word across our training corpus in order to weight each word’s vector by how important that word is in separating it from the rest.
The TF portion of that formula is implicitly included when we average a document’s words together.
idfs = words\
.filter(col("CLASS")== "train")\
.select("CATEGORY", "ID", "WORD")\
.distinct()\
.groupBy("WORD").count()\
.withColumn("IDF", log(lit(2), lit(corpus)/col("COUNT")))\
.select("WORD", "IDF")
Creating a Table for the Document Vectors
Let’s create a table to hold our document vectors.
session.sql("CREATE OR REPLACE TABLE BBC_DOCVECS (CLASS VARCHAR, CATEGORY VARCHAR, ID BIGINT, DOCVEC ARRAY)").collect()
Loading Document Vectors
With our UDFs created and our table ready, let’s get those document vectors calculated!
words.join(idfs, "WORD")\
.join(vectors, "WORD")\
.withColumn("SCALEDVEC",
parse_json(scale_vectors(
col("IDF"),
col("VECTOR")
))).groupBy(
"CLASS",
"CATEGORY",
"ID")\
.agg(
array_agg((
col("SCALEDVEC")
)).alias("SVECTORS")).select(
col('CLASS'),
col('CATEGORY'),
col('ID'),
parse_json(average_vectors(col('SVECTORS'))).alias("DOCVEC")
).write.mode("overwrite").save_as_table("BBC_DOCVECS")
This process may take some time. Keep this in mind while running it!
After this code runs, our table will be populated with the document vectors for our news articles.
Visualizing our Document Vectors with T-SNE
Overview
For those unfamiliar, a T-SNE is an embedding process that allows one to reduce the dimensionality of high-dimensional data for visualization purposes. If our document vectors carry any meaning, we would expect articles with similar content to be nearer each other in vector space. The T-SNE effectively allows us to visualize that hypothesis.
Generate the Plot
We will leave off the Snowflake connecting code for brevity since it was provided above. For this snippet, use the same pattern as in the “Uploading Word Vectors” to make sure the “con” (Snowflake connector) variable is already populated.
from sklearn.manifold import TSNE
import plotly.express as px
#previous connection code here
cur = conn.cursor()
q = cur.execute(
f"select a.class, a.category, a.id, a.words, d.docvec::string as dv from bbc_articles a inner join bbc_docvecs d on a.category=d.category and a.id=d.id"
)
df = q.fetch_pandas_all()
vecs = []
for label, content in df.items():
if label == "DV":
for v in content:
vecs.append([float(f) for f in v[1:len(v)-1].split(",")])
break
arr = np.array(vecs)
emb = TSNE().fit_transform(arr)
df['emb_x'] = pd.Series(emb[:, 0])
df['emb_y'] = pd.Series(emb[:, 1])
df['title'] = df['WORDS'].str.split('\n\n').str[0]
fig = px.scatter(
df, x="emb_x", y="emb_y",
color="CATEGORY", symbol=df["CLASS"],
symbol_sequence=["cross", "circle"], hover_data=['ID', 'title']
)
fig.write_html("plot.html")
The plot can be seen by opening the plot.html file on a new browser session.
Again, we must treat our array of floating point numbers as a string to prevent conversion issues when reading the table into a pandas Data Frame.
As you can see, the document vectors cluster together very nicely within each category, giving us an excellent foundation for doing downstream machine learning tasks.
Classifying our Test Partition
Preparing our Python Environment
For a more rigorous test of our document vectors, let’s train a Random Forest Classifier. We’re going to leverage a Snowflake warehouse by using the snowflake.ml library.
Creating a Snowflake Stage for the Model Asset
#create the stage for storing the ML models
session.sql('CREATE OR REPLACE STAGE ML_MODELS').show()
This stage will be used to store the serialized version of the trained model obtained from snowflake.ml library.
Training the Model
We start by importing the RandomForestClassifier object from the snowflake.ml library, which is hugely based on the library scikit-learn, providing the same interfaces for usage.
That is to say, when in doubt about any of its possible underlying behavior, scikit-learn’s documentation can be of great help!
from snowflake.ml.modeling.ensemble import RandomForestClassifier
vecs = []
for label, content in df.items():
if label == "DV":
for v in content:
vecs.append([float(f) for f in v[1:len(v)-1].split(",")])
break
df['vector'] = pd.Series(vecs)
x_train = [np.array(l) for l in df[df['CLASS']=="train"]['vector']]
y_train = [np.array(l) for l in df[df['CLASS']=="train"]['CATEGORY']]
train_df = pd.DataFrame([np.array(l) for l in df[df['CLASS']=="train"]['vector']])
train_df = train_df.add_prefix("X_")
feature_cols = list(train_df.columns)
train_df["Y"] = df["CATEGORY"].copy()
## Appending the categorical column to the schema definition
schema_definition = feature_cols.copy()
schema_definition = schema_definition.append("Y")
After assembling the Data Frame, it’s necessary to have it available on Snowflake – as a temporary table in this case – so it’s assumed as a Snowpark-based dataset, accessible from a warehouse, and hence, trained on top of their platform.
sf_training_dataset = session.create_dataframe(train_df, schema=schema_definition)
model = RandomForestClassifier(label_cols=["Y"]).fit(sf_training_dataset)
Model Deployment
Model Registry vs. UDF Based Deployment of Models
Currently, Snowflake’s Model Registry is on Private Preview and is not recommended for production usage, even though its capabilities seem to be awesome.
We’re very excited to be using it in the near future!
With that in mind, this article explores Snowflake’s platform and versatile computational resources to cover potential improvements in training performance.
model_asset = model.to_sklearn()
This command will return an object passive of serialization, properly parametrized with the optimized version of these same parameters.
UDF Assembling
First, we need to load the newly trained model into the formerly created Stage dedicated to this purpose.
#save the model
import joblib
joblib.dump(model_asset, 'classify_docs.joblib')
#upload into the ML_MODELS Snowflake Internal Stage
session.file.put(
"classify_docs.joblib", "@ML_MODELS", auto_compress=False, overwrite=True
)
session.clear_imports()
session.clear_packages()
#Register above uploded model as import of UDF
session.add_import("@ML_MODELS/classify_docs.joblib")
session.add_packages("joblib")
session.add_packages("snowflake-snowpark-python")
session.add_packages("snowflake-ml-python")
Next, let’s add the necessary code to do inference right inside of a Snowflake UDF.
def read_file(filename):
import joblib
import sys
import os
#where all imports located at
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m
@udf(name="classify_docs", input_types=[ArrayType()], return_type=StringType(), replace=True, stage_location = '@ML_MODELS', session=session)
def classify_docs(vecs):
test_df = pd.DataFrame(
np.array(vecs).reshape(1, -1),
columns=[f'X_{idx}' for idx, vec in enumerate(vecs)]
)
pipeline = read_file('classify_docs.joblib')
return pipeline.predict(test_df)[0]
Classify Document Vectors
With our “classify” UDF created, we can pass in a “DOCVEC” and get back a prediction that represents the most likely category for that document.
Let’s run a batch inference on all of the documents in our test partition right inside of our Snowflake Warehouse and then use the Snowpark Data Frame API to calculate and print a confusion matrix.
test.withColumn("PREDICT", classify_docs(col("DOCVEC")))\
.select(col("CATEGORY"), col("PREDICT"), lit(1).alias("NUM")).pivot(
"PREDICT",
["business", "entertainment", "politics", "sport", "tech"]
).agg(
sum(col("NUM"))
).select(
col("CATEGORY"),
coalesce(col("'business'"), lit(0)).alias("business"),
coalesce(col("'entertainment'"), lit(0)).alias("entertainment"),
coalesce(col("'politics'"), lit(0)).alias("politics"),
coalesce(col("'sport'"), lit(0)).alias("sport"),
coalesce(col("'tech'"), lit(0)).alias("tech")
).show()
When we execute the above, we get a pretty good-looking confusion matrix showing that our Random Forest model actually learned some useful features from our document vectors.
Analyzing our Results
There you have it! We’ve successfully created a document classifier that uses Snowflake as a feature store, and Snowpark as our primary compute resource for inference. We had a couple of hurdles to jump over, but we were able to get it complete with the tools we had available.
If you have your own data hurdles to jump over and would like some help or advice in implementing a machine learning pipeline on Snowflake, please contact us!
Free Generative AI Workshop
Additionally, we’re running a series of free Generative AI workshops that have helped guide many readers forward in their data journeys. Sign up today for unbiased AI/ML advice!