Leveraging real-time analytics to make informed decisions is the golden standard for virtually every business that collects data. From manufacturing to finance, healthcare to retail, and everything in between, most of today’s enterprises list achieving ‘real-time analytics’ as a primary objective, but few can actually perform real-time analytics.
If you have the Snowflake Data Cloud (or are considering migrating to Snowflake), you’re a blog away from taking a step closer to real-time analytics.Â
In this blog, we’ll show you step-by-step how to achieve real-time analytics with Snowflake via the Kafka Connector and Snowpipe.Â
Why Pursue Real-Time Analytics for Your Organization?
Nowadays, every industry is using (or hopes to use) real-time analytics to improve their business. Real-time analytics has real-time benefits. For example, in the retail industry, real-time analytics are constantly relied on to better stock shelves, understand customer demand, and maximize the sales of products (just to name a few).
In the Stock Market, real-time data helps traders make much better choices when trading stocks and other decisions by reducing risks and increasing accuracy.Â
How Snowflake Helps Achieve Real-Time Analytics
Snowflake is the ideal platform to achieve real-time analytics for several reasons, but two of the biggest are its ability to manage concurrency due to the multi-cluster architecture of Snowflake and its robust connections to 3rd party tools like Kafka.
What is Apache Kafka, and How is it Used in Building Real-time Data Pipelines?
Apache Kafka is an open-source event distribution platform. It is capable of handling high-volume and high-velocity data. It is highly scalable, has high availability, and has low latency. It can deliver a high volume of data with latency as low as two milliseconds.
It is heavily used in various industries like finance, retail, healthcare, and social media. Its use cases range from real-time analytics, fraud detection, messaging, and ETL pipelines. It follows a publish-subscribe model where producers publish data on a topic, and consumers subscribe to one or more topics to consume it.
How to Set Up Snowflake for Real-Time Analytics
This blog explains the real-time analytics process in Snowflake in two parts: how to get data from an external data source and ingest it in Snowflake in real-time and how to transfer data from Snowflake to a third-party tool like Tableau through Kafka.
Part 1: Setting up Real-time Streaming from Kafka to Snowflake
Step 1: Installing and Setting up Kafka
Verifying compatibility between Kafka Connect and Kafka versions before proceeding with the installation is important.
Note that Zookeeper also needs to be installed and set up for the older Kafka Connector version. Once Kafka is ready, create a Topic and a Producer.
Step 2: Setup Snowflake Connector
This blog shows an example of using the open-source version of Kafka, but the Confluent version also can be used. Start by downloading the Snowflake Kafka Connector.
After that, key pair authentication needs to be set up with a public and a private key, as the Snowflake connector only accepts key pair authentication.Â
Example command to generate a private key:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 aes256 -inform PEM -out C:\tmp\new_rsa_key_v1.p8
This private key will be used to generate the public key.
Example:
openssl rsa -in C:\tmp\new_rsa_key_v1.p8 -pubout -out C:\tmp\new_rsa_key_v1.pub
Create a file named SF_connect.properties
inside Kafka /config/
and fill in all values.
name=mykafkaconnectsnowflake
connector.class=com.snowflake.kafta.connector.SnowflakeSinkConnector
task.max=8
topics=snowflake_in_topic
snowflake.topic2table.map=snowflake_in_top:KAFKA_TABLE_IN
buffer.count.records=1
buffer.flush.time=10
buffer.size.bytes=5
snowflake.url.name=
snowflake.user.name=
snowflake.private.key=
snowflake.private.key.passphrase=changeitnow
snowflake.database.name=KAFKA_CONNECT_DB
snowflake.schema.name=KAFKA_CONNECT_SCHEMA
value.converter.schema.registry.url=
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
Step 3: Setup Snowflake to Connect with Kafka Connector
Create a Database, Schema, Tables, Stage, and Pipe in Snowflake to handle streaming data arriving from Kafka Producer.
CREATE DATABASE KAFKA_CONNECT_DB;
CREATE ROLE KAFKA_CONNECT_ROLE;
CREATE SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA;
CREATE TABLE KAFKA_TABLE_IN (RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT);
CREATE WAREHOUSE KAFKA_WAREHOUSE WITH WAREHOUSE_SIZE = 'XSMALL' WAREHOUSE_TYPE = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE;
-- create necessary grants
GRANT USAGE ON DATABASE KAFKA_CONNECT_DB TO ROLE KAFKA_CONNECT_ROLE;
GRANT USAGE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE TABLE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE STAGE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT CREATE PIPE ON SCHEMA KAFKA_CONNECT_DB.KAFKA_CONNECT_SCHEMA TO ROLE KAFKA_CONNECT_ROLE;
GRANT OWNERSHIP ON TABLE KAFKA_TABLE_IN TO ROLE KAFKA_CONNECT_ROLE;
/bin/connect-standalone.sh
//connect-standalone.properties
/config/SF_connect.properties
Step 4: Test Kafka Connector by Sending a Message
Start by creating a topic in Kafka Producer. Use the sample command to create a topic and start the producer:
bin/kafka-topics.sh --create --topic snowflake_in_topic --bootstrap-server localhost:9090
bin/kafka-console-producer --broker-list localhost:9092 --topic snowflake_in_topic
{"order_id": {"int": 1212}, "customer_id": {"string": abcd}, "order_ts": {"int": 333333}, "order_total_usd": {"double": 3.8900000000000001}, "item": {"string": "Wainwright"}}
If the connection is successful, the sample real-time data should be available in Snowflake.Â
Part 2: Setting up Real-time Data from Snowflake to Another Platform
In this part, we’ll cover how to transfer data out from Snowflake so that it can be consumed by any third-party tool used for visualization in real time.
Step 1: Setting up Snowflake and Kafka for Real-Time Data Flow
CREATE OR REPLACE TABLE "snowflake2kafka"
(
"id" integer not null,
"created_at" timestamp not null,
"updated_at" timestamp not null,
"first_name" varchar(255),
"address" varchar(255)
);
-- Add some data to this table
INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (11, '2023-11-20', '2023-11-20', 'Ankit', 'Chandigarh');
INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (12, '2023-11-29', '2023-11-29', 'Ankit', 'New York');
-- Verify your data
SELECT * FROM "snowflake2kafka";
For the Kafka setup for this part, this blog shows the example of a Docker image of Kafka. This Kafka distribution has Kafka, Kafka Connector, Zookeeper, and Confluent all bound together nicely.Â
docker pull landoop/fast-data-dev
The Docker desktop should look like this:
In the optional settings of docker image, In the volume section, Do specify the exact path of your local machine where the real-time data files will be stored:
volumes:
# This should be any path in your system where data files will be stored.
Example: C:\Users\ankit\Downloads\Projects\snowflake2kafka:\my_real_data
Step 2: Setting up Confluent Kafka Connect:
In Kafka UI, Click on KAFKA CONNECT UI, then click on a new connector, and then select the JDBC source connector: io.confluent.connect.jdbc.JdbcSourceConnector
In the Properties section of the JDBC-Kafka tab, add the Snowflake properties that were just created and the Kafka Connector properties:Â
name=snowflake.to.kafka.connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=updated_at
incrementing.column.name=id
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
connection.password=
query=select * from "snowflake2kafka"
connection.attempts=100
transforms=AddNamespace,createKey,AddKeyNamespace
connection.backoff.ms=300000
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
timestamp.delay.interval.ms=3000
table.types=table
mode=incrementing
topic.prefix=snowflake-kafka-etl
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
connection.user=
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.createKey.fields=id
poll.interval.ms=1500
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
numeric.mapping=best_fit
connection.url=jdbc:snowflake://ACCOUNT.snowflakecomputing.com/?warehouse=warehousename&db=test&schema=public&role=WHATEVEROLE
Step 3: Configure and setup all the paths and libraries:
Now, Download the latest version of the Snowflake JDBC driver. Then look for \kafka-connect-jdbc\ for your Kafka setup. If unable to find it, look in the docker-desktop-data. Inside this folder, search for kafka-connect-jdbc
Once this directory has been found, put the Snowflake JDBC Jar file in it.
Now, start the container by simply clicking on Start. The UI will be like this after that:
Step 4: Test the Connection
The last step is to test the connection by adding some records in the snowflake2kafka
table. If everything is set up correctly, real-time data should be available in Kafka UI once it is entered in the Snowflake tables.
Closing
Building a real-time analytics pipeline in Snowflake is well achievable with Apache Kafka. To take data out from Snowflake and ingest data into Snowflake in real time requires different steps, but Apache Kafka can help with both operations.Â
Looking for additional help?
The experts at phData can guide you! As Snowflake’s Partner of the Year, we can help your organization achieve real-time analytics with Snowflake. Contact us today!
FAQs
Question 1
Confluent provides a truly cloud-native experience. It provides a holistic set of enterprise-grade features and meets all of your architectural needs. Confluent Kafka is also powered by a user-friendly interface that enables the development of event-driven microservices and other real-time use cases. Confluent Kafka will provide all the features that Apache Kafka has, plus some additional features like an interactive UI and pre-installed libraries, whereas, in Apache Spark, one has to set up everything on their own.