Coalesce is a fantastic transformation solution built specifically to run on the Snowflake AI Data Cloud. Because it generates Snowflake SQL with an easy-to-use, code-first GUI interface, it can take advantage of everything Snowflake offers, even if the feature is brand new.Â
This blog will cover creating custom nodes in Coalesce, what new advanced features can already be used as nodes, and how to create them as part of your data pipeline.Â
Custom Nodes in Coalesce
Although Coalesce has some predefined nodes for commonly used objects such as tables or views, it also lets you create custom nodes for more advanced features in Snowflake. To create a UDN, we’ll need a node definition that defines how the node should function and templates for how the object will be created and run.Â
Node Definition
The Node Definition defines the UI elements and other shared attributes available to that Node Type. The data input into the UI through text boxes or buttons is then available for use by the create-and-run templates to help determine how they’ll be run.Â
Here is a simple example of a node definition that displays a text box for input of SQL that is required by the node to run:
capitalized: CREATE SQL
short: SQL
plural: SQLs
tagColor: '#2EB67D'
config:
- groupName: Options
items:
- displayName: SQL
attributeName: SQL1
type: textBox
syntax: sql
isRequired: true
Create Template
The Create Template defines the creation of the object’s structure, equivalent to using Data Definition Language (DDL). Unlike DDL, Create Template uses the templating language Jinja to allow for the dynamic creation of objects based on the node definition and input from the user.Â
Here is a simple example of a create template for a stage table:
{{ stage('Create Stage Table') }}
CREATE OR REPLACE TABLE {{ ref_no_link(node.location.name, node.name) }}
(
{% for col in columns %}
"{{ col.name }}" {{ col.dataType }}
{%- if not col.nullable %} NOT NULL
{%- if col.defaultValue | length > 0 %} DEFAULT {{ col.defaultValue }}{% endif %}
{% endif %}
{%- if col.description | length > 0 %} COMMENT '{{ col.description | escape }}'{% endif %}
{%- if not loop.last -%}, {% endif %}
{% endfor %}
)
{%- if node.description | length > 0 %} COMMENT = '{{ node.description | escape }}'
Run Template
The Run Template is similar to the Create Template in using Jinja. Still, instead of templating the object’s creation with DDL, it templates the object’s running using Data Manipulation Language (DML).Â
Here’s a portion of the run template for a stage table in which it looks for sources to join or union them together and inserts them into the table:
{% if config.insertStrategy == 'INSERT' %}
{{ stage('Insert ' + source.name | string ) }}
INSERT INTO {{ ref_no_link(node.location.name, node.name) }}
(
{% for col in source.columns %}
"{{ col.name }}"
{%- if not loop.last -%},{% endif %}
{% endfor %}
)
{% endif %}
SELECT
{% for col in source.columns %}
{{ get_source_transform(col) }} AS "{{ col.name }}"
{%- if not loop.last -%}, {% endif %}
{% endfor %}
{{ source.join }}
{% if config.insertStrategy in ['UNION', 'UNION ALL'] and not loop.last %}
{{config.insertStrategy}}
{% endif %}
Using Advanced Snowflake Features in Coalesce
Now that we know how custom nodes are defined, we can use that knowledge to create more advanced objects that don’t come out of the box with Coalesce.
Dynamic Tables
Dynamic tables, a recent feature in Snowflake, are a game changer for data engineering. They allow you to define your pipeline outcomes using declarative SQL without worrying about the steps to achieve them. Dynamic tables are automatically refreshed as the underlying data changes, only operating on new changes since the last refresh. Snowflake even handles the orchestration and scheduling of the refresh. They’re essentially an entire data pipeline within itself.Â
With Coalesce, you can take advantage of all the features of Dynamic tables directly within their larger data pipeline.
Start by creating a node for Dynamic tables using the code Coalesce provides. Because Dynamic tables run independently, we only need a node definition and a template for the node.
Now that you have your Dynamic table node select your source(s) and either choose to create the table from Add Nodes for one source or Join Nodes for more:
The Dynamic table node comes with all the options required by Snowflake, including:
Text box to choose which warehouse the table will execute on
Toggle for whether downstream tables will determine the refresh of this table
Lag specification if downstream is not chosen to select how often the table is refreshed
Options include seconds, minutes, hours, and days
Refresh mode sets how the table will be refreshed
Options here are Auto, Incremental, and Full
Initialize determines when the table will start being refreshed, either as soon as it’s created or when you schedule it to start refreshing.
Once the create action is used, the node will create your Dynamic table with all necessary options, and it will be ready to use within your pipeline like any other table:
CREATE OR REPLACE DYNAMIC TABLE "DELISI"."WORKDEV"."DT_WRK_PART_SUPPLIER"
TARGET_LAG = '60 Seconds'
WAREHOUSE = SANDBOX_WH
REFRESH_MODE = INCREMENTAL
INITIALIZE = ON_SCHEDULE
(
"P_PARTKEY",
"P_NAME",
"P_MFGR",
"P_BRAND",
"P_TYPE",
"P_SIZE",
"P_CONTAINER",
"P_RETAILPRICE",
"P_COMMENT",
"S_SUPPKEY",
"S_NAME",
"S_ADDRESS",
"S_NATIONKEY",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT"
)
AS
SELECT
"PART"."P_PARTKEY" AS "P_PARTKEY",
"PART"."P_NAME" AS "P_NAME",
"PART"."P_MFGR" AS "P_MFGR",
"PART"."P_BRAND" AS "P_BRAND",
"PART"."P_TYPE" AS "P_TYPE",
"PART"."P_SIZE" AS "P_SIZE",
"PART"."P_CONTAINER" AS "P_CONTAINER",
"PART"."P_RETAILPRICE" AS "P_RETAILPRICE",
"PART"."P_COMMENT" AS "P_COMMENT",
"SUPPLIER"."S_SUPPKEY" AS "S_SUPPKEY",
"SUPPLIER"."S_NAME" AS "S_NAME",
"SUPPLIER"."S_ADDRESS" AS "S_ADDRESS",
"SUPPLIER"."S_NATIONKEY" AS "S_NATIONKEY",
"SUPPLIER"."S_PHONE" AS "S_PHONE",
"SUPPLIER"."S_ACCTBAL" AS "S_ACCTBAL",
"SUPPLIER"."S_COMMENT" AS "S_COMMENT"
FROM "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF1"."PART" "PART"
INNER JOIN "SNOWFLAKE_SAMPLE_DATA"."TPCH_SF1"."SUPPLIER" "SUPPLIER"
ON "PART"."P_PARTKEY" = "SUPPLIER"."P_PARTKEY"
Cortex Forecast
Snowflake has created a fully managed service to easily perform machine learning and artificial intelligence operations with simple SQL commands directly in your Snowflake account called Cortex. Cortex has a function called Forecasting that allows you to predict future values of a target dataset based on influence factors. Using a custom node in Coalesce, data engineers can create and query these forecasts within their data pipeline.
To use this feature, create a node with the code Coalesce provides for Cortex Forecasts.
Once your node is created, you can create a new CORTEX FORECAST node based on the source(s) of the data relevant to the data you’d like to predict.Â
Within the CORTEX node, a data engineer simply sets the options for a Forecast within Snowflake, including:
A toggle for creating multi/single seriesÂ
When multi-series is selected, the node will prompt for a series column
A dropdown for the timestamp column to be used
A dropdown for the target column to be predicted
A toggle for whether or not to include exogenous variables
When left unchecked, a day-to-forecast text box is included in the input the days to forecast out to
The node will create a table to hold the predicted data and, when run, create the forecasting model in Snowflake. Note that creating the forecasting model triggers its training and incurs compute costs.
Now that the predicted data is in a table in Snowflake, it can be used like any other within your Coalesce pipeline.
Stored Procedures
Stored Procedures may not seem like an advanced database feature, but they really can be in Snowflake. Stored procedures can be written in several languages in Snowflake, including SQL, JavaScript, Python, Java, and Scala. With all these options and Snowpark’s use, Snowflake’s procedures open up many more options than simple transformations. Because of this, you might find running these procedures within your Coalesce pipeline advantageous, which is made easy with a stored procedure node.
Creating this node is very simple since we want it to run a SQL statement.Â
The node definition creates two text boxes, one for creating a procedure (if needed) and one for calling a procedure on a run where the user can input the desired SQL statements. You do not have to use Coalesce to create the procedure if it’s already in Snowflake; you can only use the RUN_PROCEDURE
attribute.
capitalized: CREATE SQL
short: SQL
plural: SQLs
tagColor: '#2EB67D'
config:
- groupName: Options
items:
- displayName: CREATE_PROCEDURE
attributeName: SQL1
type: textBox
syntax: sql
isRequired: false
- displayName: RUN_PROCEDURE
attributeName: SQL2
type: textBox
syntax: sql
isRequired: false
systemColumns:
- displayName: SQL_SEQ
transform: ''
dataType: NUMBER IDENTITY
placement: beginning
attributeName: isSurrogateKey
The create and run templates are similar as they simply look for the proper SQL inputs and run them accordingly:
{% if config.SQL1 %}
{{ stage('SQL1') }}
{{ config.SQL1 }}
{% endif %}
{% if config.SQL2 %}
{{ stage('SQL2') }}
{{ config.SQL2 }}
{% endif %}
That’s it! Now, we create a new node, either on its own or based on another node, to run after that node. Note: If you would like it to run after a specific node, you’ll need to remove the columns from the previous node from your stored procedure node, as they’re not needed.Â
Here’s an example that creates and runs a Snowpark procedure named checkStatus()
:
Closing
As you can see, with the customizable nature of Coalesce, there’s no waiting for Snowflake’s new features to be released within the transformation tool. You can create a custom node to do almost anything that you can do within the Snowflake UI. This opens up so many options within your data pipeline and cements Coalesce as one of the premier transformation tools for Snowflake.Â
For further assistance or to learn more about maximizing your use of Coalesce and Snowflake, contact phData. Our experts are ready to help you leverage these powerful tools to their fullest potential. Contact us today to get started.