May 3, 2024

How to Unlock Real-Time Analytics with Snowflake?

By Ankit Kaushal

Leveraging real-time analytics to make informed decisions is the golden standard for virtually every business that collects data. From manufacturing to finance, healthcare to retail, and everything in between, most of today’s enterprises list achieving ‘real-time analytics’ as a primary objective, but few can actually perform real-time analytics.

If you have the Snowflake Data Cloud (or are considering migrating to Snowflake), you’re a blog away from taking a step closer to real-time analytics. 

In this blog, we’ll show you step-by-step how to achieve real-time analytics with Snowflake via the Kafka Connector and Snowpipe. 

Why Pursue Real-Time Analytics for Your Organization?

Nowadays, every industry is using (or hopes to use) real-time analytics to improve their business. Real-time analytics has real-time benefits. For example, in the retail industry, real-time analytics are constantly relied on to better stock shelves, understand customer demand, and maximize the sales of products (just to name a few).

In the Stock Market, real-time data helps traders make much better choices when trading stocks and other decisions by reducing risks and increasing accuracy. 

How Snowflake Helps Achieve Real-Time Analytics

Snowflake is the ideal platform to achieve real-time analytics for several reasons, but two of the biggest are its ability to manage concurrency due to the multi-cluster architecture of Snowflake and its robust connections to 3rd party tools like Kafka.

What is Apache Kafka, and How is it Used in Building Real-time Data Pipelines?

Apache Kafka is an open-source event distribution platform. It is capable of handling high-volume and high-velocity data. It is highly scalable, has high availability, and has low latency. It can deliver a high volume of data with latency as low as two milliseconds.

It is heavily used in various industries like finance, retail, healthcare, and social media. Its use cases range from real-time analytics, fraud detection, messaging, and ETL pipelines. It follows a publish-subscribe model where producers publish data on a topic, and consumers subscribe to one or more topics to consume it.

Ideal Architecture to Achieve Real-Time Analytics with Snowflake and Kafka

How to Set Up Snowflake for Real-Time Analytics

This blog explains the real-time analytics process in Snowflake in two parts: how to get data from an external data source and ingest it in Snowflake in real-time and how to transfer data from Snowflake to a third-party tool like Tableau through Kafka.

Part 1: Setting up Real-time Streaming from Kafka to Snowflake

Step 1: Installing and Setting up Kafka

Verifying compatibility between Kafka Connect and Kafka versions before proceeding with the installation is important.

Note that Zookeeper also needs to be installed and set up for the older Kafka Connector version. Once Kafka is ready, create a Topic and a Producer.

Step 2: Setup Snowflake Connector

This blog shows an example of using the open-source version of Kafka, but the Confluent version also can be used. Start by downloading the Snowflake Kafka Connector.

After that, key pair authentication needs to be set up with a public and a private key, as the Snowflake connector only accepts key pair authentication. 

Example command to generate a private key:

				
					openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out C:\tmp\new_rsa_key_v1.p8
				
			

This private key will be used to generate the public key.

Example:

				
					openssl rsa -in C:\tmp\new_rsa_key_v1.p8 -pubout -out C:\tmp\new_rsa_key_v1.pub
				
			

Create a file named SF_connect.properties inside Kafka /config/ and fill in all values.

				
					name=mykafkaconnectsnowflake
connector.class=com.snowflake.kafta.connector.SnowflakeSinkConnector
task.max=8
topics=snowflake_in_topic
snowflake.topic2table.map=snowflake_in_top:KAFKA_TABLE_IN
buffer.count.records=1
buffer.flush.time=10
buffer.size.bytes=5
snowflake.url.name=<YOUR SNOWFLAKE INSTANCE ID>
snowflake.user.name=<YOUR SNOWFLAKE INSTANCE USERNAME>
snowflake.private.key=
snowflake.private.key.passphrase=changeitnow
snowflake.database.name=KAFKA_CONNECT_DB
snowflake.schema.name=KAFKA_CONNECT_SCHEMA
value.converter.schema.registry.url=
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
				
			

Step 3: Setup Snowflake to Connect with Kafka Connector

Create a Database, Schema, Tables, Stage, and Pipe in Snowflake to handle streaming data arriving from Kafka Producer.

Example of creating database objects for Kafka Producer
				
					CREATE DATABASE KAFKA_CONNECT_DB;
CREATE ROLE KAFKA_CONNECT_ROLE;
CREATE SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA;
CREATE TABLE KAFKA_TABLE_IN (RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT);
CREATE WAREHOUSE KAFKA_WAREHOUSE WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE;

-- create necessary grants
GRANT USAGE ON DATABASE KAFKA_CONNECT_DB TO ROLE KAFKA_CONNECT_ROLE;
GRANT USAGE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE TABLE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE STAGE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE PIPE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT OWNERSHIP ON TABLE KAFKA_TABLE_IN TO ROLE KAFKA_CONNECT_ROLE;
				
			
Command to start Kafka Connector:
				
					<kafka_dir>/bin/connect-standalone.sh
<kafka_dir>/<path>/connect-standalone.properties
<kafka_dir>/config/SF_connect.properties
				
			

Step 4: Test Kafka Connector by Sending a Message

Start by creating a topic in Kafka Producer. Use the sample command to create a topic and start the producer:

To create the topic:
				
					 bin/kafka-topics.sh --create --topic snowflake_in_topic --bootstrap-server localhost:9090
				
			
To start the producer:
				
					bin/kafka-console-producer --broker-list localhost:9092 --topic snowflake_in_topic
				
			
Send a sample message:
				
					{"order_id": {"int": 1212}, "customer_id": {"string": abcd}, "order_ts": {"int": 333333}, "order_total_usd": {"double": 3.8900000000000001}, "item": {"string": "Wainwright"}}
				
			

If the connection is successful, the sample real-time data should be available in Snowflake. 

Part 2: Setting up Real-time Data from Snowflake to Another Platform

In this part, we’ll cover how to transfer data out from Snowflake so that it can be consumed by any third-party tool used for visualization in real time.

High-level diagram of Sending Real-time Data from Snowflake to Kafka

Step 1: Setting up Snowflake and Kafka for Real-Time Data Flow

				
					CREATE OR REPLACE TABLE "snowflake2kafka"
(
    "id"              integer not null,
    "created_at"    timestamp not null,
    "updated_at"    timestamp not null,
    "first_name"          varchar(255),
    "address"             varchar(255)
);

-- Add some data to this table
INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (11, '2023-11-20', '2023-11-20', 'Ankit', 'Chandigarh');
INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (12, '2023-11-29', '2023-11-29', 'Ankit', 'New York');

-- Verify your data
SELECT * FROM "snowflake2kafka";
				
			

For the Kafka setup for this part, this blog shows the example of a Docker image of Kafka. This Kafka distribution has Kafka, Kafka Connector, Zookeeper, and Confluent all bound together nicely. 

Use this command to install this docker setup of Kafka: 
				
					docker pull landoop/fast-data-dev
				
			

The Docker desktop should look like this:

In the optional settings of docker image, In the volume section, Do specify the exact path of your local machine where the real-time data files will be stored:

				
					    volumes:
    # This should be any path in your system where data files will be stored.
    Example: C:\Users\ankit\Downloads\Projects\snowflake2kafka:\my_real_data

				
			

Step 2: Setting up Confluent Kafka Connect:

In Kafka UI, Click on KAFKA CONNECT UI, then click on a new connector, and then select the JDBC source connector: io.confluent.connect.jdbc.JdbcSourceConnector

In the Properties section of the JDBC-Kafka tab, add the Snowflake properties that were just created and the Kafka Connector properties: 

				
					name=snowflake.to.kafka.connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=updated_at
incrementing.column.name=id
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
connection.password=<ADD_YOUR_PASSWORD>
query=select * from "snowflake2kafka"
connection.attempts=100
transforms=AddNamespace,createKey,AddKeyNamespace
connection.backoff.ms=300000
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
timestamp.delay.interval.ms=3000
table.types=table
mode=incrementing
topic.prefix=snowflake-kafka-etl
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
connection.user=<your_snowflake_username>
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.createKey.fields=id
poll.interval.ms=1500
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
numeric.mapping=best_fit
connection.url=jdbc:snowflake://ACCOUNT.snowflakecomputing.com/?warehouse=warehousename&db=test&schema=public&role=WHATEVEROLE

				
			

Step 3: Configure and setup all the paths and libraries:

Now, Download the latest version of the Snowflake JDBC driver. Then look for \kafka-connect-jdbc\ for your Kafka setup. If unable to find it, look in the docker-desktop-data. Inside this folder, search for kafka-connect-jdbc

Once this directory has been found, put the Snowflake JDBC Jar file in it.

Now, start the container by simply clicking on Start. The UI will be like this after that:

Step 4: Test the Connection

The last step is to test the connection by adding some records in the snowflake2kafka table. If everything is set up correctly, real-time data should be available in Kafka UI once it is entered in the Snowflake tables.

Closing

Building a real-time analytics pipeline in Snowflake is well achievable with Apache Kafka. To take data out from Snowflake and ingest data into Snowflake in real time requires different steps, but Apache Kafka can help with both operations. 

Looking for additional help?

The experts at phData can guide you! As Snowflake’s Partner of the Year, we can help your organization achieve real-time analytics with Snowflake. Contact us today!

FAQs

Confluent provides a truly cloud-native experience. It provides a holistic set of enterprise-grade features and meets all of your architectural needs. Confluent Kafka is also powered by a user-friendly interface that enables the development of event-driven microservices and other real-time use cases. Confluent Kafka will provide all the features that Apache Kafka has, plus some additional features like an interactive UI and pre-installed libraries, whereas, in Apache Spark, one has to set up everything on their own.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit