Next up in our blog series on Snowpark, we’ll discuss machine learning basics and K-Means clustering in Snowpark with an example.
What is Machine Learning?
Machine learning (ML) is established by the evolutionary study of pattern recognition and computational learning theory in artificial intelligence. ML uses algorithms that can learn from and make predictions on data. These algorithms operate by building a model based on the patterns of existing/historical data. This model is then used to make data-driven predictions or decisions, rather than following rule-based instructions.
In general, ML can be grouped into three main categories depending on the nature of the learning “signal” or “feedback” available to a learning system). Listed below is a brief explanation of each of the main ML categories:
- Supervised Learning - Also identified as Predictive Modeling, which is built on labeled data (data that we can use as examples of correct or incorrect answers). The goal is to learn a general rule that maps inputs to outputs in order to perform predictions or classifications.
- Unsupervised Learning - Finds hidden patterns on unlabeled data (data without defined categories or groups). It deals with two types of problems, clustering and dimensionality reduction (summarizing a large amount of information into a smaller amount while retaining the ability to analyze the data).
- Reinforcement Learning - Uses a combination of labeled and unlabeled data. The goal is to enable an agent to learn in an interactive environment by trial and error using feedback from its own actions and experiences.
For the rest of this post, we’re going to cover a popular unsupervised learning algorithm: K-Means clustering, which we will implement with scikit-learn (sklearn) on Python and then execute on Snowflake using Snowpark.
What is K-Means Clustering?
K-Means is a clustering algorithm, which means that it separates a dataset into distinct clusters (groups) of similar examples. The number of clusters is chosen by the user of the algorithm and is represented by the variable K. The K-Means algorithm assigns each example in the training dataset to one of the K clusters. It also calculates the average example (centroid) of each cluster so that new examples can be assigned to their most similar (closest) cluster.
Building the Clustering Model
The dataset used in this example is from the Uber dataset that contains a sample of 652,435 pickups in New York City (more on the dataset here). The K-Means model clusters the Uber trip data based on the Latitude and Longitude of each trip. This model can then be used to do real-time analysis of new Uber trips.
Our goal of this example is to highlight the use of machine learning with Snowpark. We will apply the K-Means algorithm to a dataset using Sklearn in Python and export the model to an open format called Predictive Model Markup Language (PMML). We will then use Snowpark to assign data in Snowflake to the clusters learned by the K-Means model.
Model Training
To build the K-Means model for Uber dataset, we need to first include some python libraries:
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from nyoka import skl_to_pmml
from sklearn.model_selection import train_test_split
Next, we read the Uber dataset as a Pandas DataFrame. This dataset will be used to predict new Uber trips. To train and test the K-Means model, the dataset needs to be split into a training dataset and a test dataset. 70 percent of the data is used to train the model, and 30 percent will be used for validation.
df = pd.read_csv('uber.csv')
predictors = ['Lat', 'Lon']
train, test = train_test_split(df[['Lat', 'Lon']], test_size=0.3)
Building the K-Means Model
The workflow of building a Sklearn K-Means model is by creating a pipeline object and populating it with any pre-processing steps and the model object. In addition, the model needs to define the K number of clusters, before calling pipe.fit(train) method to train the model.
pipe = Pipeline([('model', KMeans(n_clusters=8, random_state=0))])
pipe.fit(train)
We can call the predict method on the test the trained model.
pipe.predict(test)
# The output:
array([5, 3, 0, ..., 0, 1, 0], dtype=int32)
Finally, we convert the model to a PMML file. Why use PMML? PMML enables researchers to quickly deploy models with different programming languages such as: R, Python, c++, etc. in a real-time environment with a more robust stack like Java. We have multiple options to convert the Sklearn to PMML model. One popular library is Nyoke.
Nyoke is a Python library with comprehensive support of the latest PMML 4.4 standard. Nyoka has great support for other types of models, like Keras (deep learning models) and ARIMA (forecasting models). A complete list of supported models can be found here. We can simply call skl_to_pmml method to save the PMML model with the file named kmean_model.pmml.
skl_to_pmml(pipeline=pipe,
col_names=predictors,
pmml_f_name="kmean_model.pmml")
Executing the ML Model on Snowflake with Snowpark
The first step is to establish a session with the Snowflake database. Before executing the sample code, replace all the <placeholders> with your Snowflake connection information (more information on the Creating a Session process can be found here).
import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._
object Main {
def main(args: Array[String]): Unit = {
// Replace the below.
val configs = Map (
"URL" -> "https://.snowflakecomputing.com:443",
"USER" -> "",
"PASSWORD" -> "",
"ROLE" -> "",
"WAREHOUSE" -> "",
"DB" -> "",
"SCHEMA" -> ""
)
val session = Session.builder.configs(configs).create
}}
The next step is to create a database and upload your Uber dataset to your database via the web UI. (a detailed step-by-step tutorial on creating a database and schema and uploading data can be found here). As noted earlier, the K-Means model is based on the trip attributes/features(lat, lon). The following is the structure/schema for the Uber trip record:
val schema = StructType(val schema = StructType(
StructField("time", TimestampType, nullable = true) ::
StructField("lat", DoubleType, nullable = true) ::
StructField("lon", DoubleType, nullable = true) ::
StructField("base", StringType, nullable = true) ::
Nil
)
val uberDf = session.read.schema(schema).table("UBER_TABLE")
uberDf.show()
And this is the sample output from the Uber table.
------------------------------------------------------
|"TIME" |"LAT" |"LON" |"BASE" |
------------------------------------------------------
|"5/1/2014 0:02:00" |40.7521 |-73.9914 |"B02512" |
|"5/1/2014 0:06:00" |40.6965 |-73.9715 |"B02512" |
|"5/1/2014 0:15:00" |40.7464 |-73.9838 |"B02512" |
|"5/1/2014 0:17:00" |40.7463 |-74.0011 |"B02512" |
|"5/1/2014 0:17:00" |40.7594 |-73.9734 |"B02512" |
|"5/1/2014 0:20:00" |40.7685 |-73.8625 |"B02512" |
|"5/1/2014 0:21:00" |40.7637 |-73.9962 |"B02512" |
|"5/1/2014 0:21:00" |40.7252 |-74.0023 |"B02512" |
|"5/1/2014 0:25:00" |40.7607 |-73.9625 |"B02512" |
|"5/1/2014 0:25:00" |40.7212 |-73.9879 |"B02512" |
------------------------------------------------------
Once we have the dataset uploaded into the database, we can start by creating a Scala class to load the saved PMML model. The below sample code takes as input the PMML file and returns the model.
package algorithms
import org.pmml4s.model.Model
class KmeanPMML(modelPath: String) {
def getModel(): Model = {
val kmean: Model = Model.fromFile( s"$modelPath/lib/pmml/kmean_model.pmml")
kmean
}
}
Next, we need to add the pmml4s_2.12-0.9.11.jar and spray-json_2.12-1.3.5.jar files as dependencies for Snowpark so that they will get uploaded to Snowflake when the model runs. The code sample below shows how Snowpark handles the dependency injections.
val libPath = new java.io.File("").getAbsolutePath
println(libPath)
ession.addDependency(s"$libPath/lib/pmml4s_2.12-0.9.11.jar")
session.addDependency(s"$libPath/lib/spray-json_2.12-1.3.5.jar")
Finally, the K-Means model can be used to detect the clusters/category of the new data (for example, the real-time Uber trip data). The following example creates a Snowpark user-defined function (or UDF) named transformationUDF, which calls the predict PMML model method and returns the category as a new column called label from the Snowpark DataFrame.
val model = new KmeanPMML(libPath).getModel()
val myTransformationUDF = udf((lat: Double, lon: Double) => {
val v = Array[Double](lat, lon)
model.predict(v).last.asInstanceOf[String]
})
uberDf.withColumn("label", myTransformationUDF(col("lat"), col("lon"))).show()
Below is the sample output from the Snowpark DataFrame. It shows the column with the predicted label.
----------------------------------------------------------------
|"TIME" |"LAT" |"LON" |"BASE" |"LABEL" |
----------------------------------------------------------------
|"5/1/2014 0:02:00" |40.7521 |-73.9914 |"B02512" |0 |
|"5/1/2014 0:06:00" |40.6965 |-73.9715 |"B02512" |3 |
|"5/1/2014 0:15:00" |40.7464 |-73.9838 |"B02512" |0 |
|"5/1/2014 0:17:00" |40.7463 |-74.0011 |"B02512" |5 |
|"5/1/2014 0:17:00" |40.7594 |-73.9734 |"B02512" |0 |
|"5/1/2014 0:20:00" |40.7685 |-73.8625 |"B02512" |4 |
|"5/1/2014 0:21:00" |40.7637 |-73.9962 |"B02512" |0 |
|"5/1/2014 0:21:00" |40.7252 |-74.0023 |"B02512" |5 |
|"5/1/2014 0:25:00" |40.7607 |-73.9625 |"B02512" |0 |
|"5/1/2014 0:25:00" |40.7212 |-73.9879 |"B02512" |5 |
----------------------------------------------------------------
Conclusion
In this post, we’ve demonstrated how a Snowpark UDF can be used to perform complex tasks like the assignment of records into the clusters learned by the K-Means ML algorithm.
Snowpark allows these operations to execute within the Snowflake data cloud without the need for external infrastructure. This is truly a game-changer for data operations. Data scientists can now streamline model development with Snowpark by eliminating the need to move data outside the data warehouse. This also provides a significant security benefit.
If your team is interested in learning more about ML models running in Snowflake, please reach out to us at phData; we would love to help you kick off your project!