Modern low-code/no-code ETL tools allow data engineers and analysts to build pipelines seamlessly using a drag-and-drop and configure approach with minimal coding. However, if the tool supposes an option where we can write our custom programming code to implement features that cannot be achieved using the drag-and-drop components, it broadens the horizon of what we can do with our data pipelines. One such option is the availability of Python Components in Matillion ETL, which allows us to run Python code inside the Matillion instance.
In this blog, we will describe 10 such Python Scripts that can provide a blueprint for using the Python component efficiently in Matillion ETL for Snowflake AI Data Cloud.Â
Understanding Matillion and Snowflake, the Python Component, and Why it is Used
Matillion is a SaaS-based data integration platform that can be hosted in AWS, Azure, or GCP and supports multiple cloud data warehouses. Matillion ETL for Snowflake is an ELT/ETL tool that allows for the ingestion, transformation, and building of analytics for data in the Snowflake AI Data Cloud.
The Python component in Matillion is a GUI-based script that allows us to write Python code to interact with multiple services inside the Matillion Instance and Snowflake, as well as outside these systems, based on the security configuration of the instance.Â
We have to configure the below two parameters in the Python Component before we define the code –Â
Interpreter: Different versions of Python interpreters are available, including Jython, Python 2, or Python 3. Jython is to be used for database connectivity only. The default value is Python3.
Timeout: The component must also specify a timeout value in seconds, indicating the duration the script will terminate automatically. The default value is 360 seconds.
Hence, given the diverse packages available in Python, writing custom code via Python scripts can be a powerful tool to enhance data pipelines and perform tasks not available out-of-the-box in the Matillion component library.
This allows flexibility and integration of multiple systems with Snowflake, expanding Matillion ETL’s capabilities through user-defined code and making it a powerful ETL/ELT tool for processing data in Snowflake.
Top 10 Python Scripts for use in Matillion for Snowflake
1. Check Input Variables Required for the Pipelines (check_input_var.py)
Purpose: Many of our pipelines are dynamic, requiring input variables from the upstream processes to run successfully. For example, we can pass the Snowflake landing and staging table names as input variables to a pipeline whose function is to append and load raw data into the landing table from the staging table. If the variables from upstream processes/events are unavailable, the pipeline will abort, throwing an error.Â
Benefits: This helps make dynamic Matillion ETL pipelines resilient to upstream issues and determine the point of failure at the source. It also allows us to assign default values if the variables are empty from the source.
class CustomException(Exception):
pass
#JV_LANDING_TBL is the name of the Landing table which the job expects
if not JV_LANDING_TBL:
raise CustomException('Input Variable- JV_LANDING_TBL is empty and is mandatory for this job')
#JV_STAGING_TBL is the name of the Staging table which the job expects
if not JV_STAGING_TBL:
raise CustomException('Input Variable- JV_STAGING_TBL is empty and is mandatory for this job')
#JV_LANDING_SCHEMA is the name of the Landing Schema which the job expects and defaults to 'LND'.
if not JV_LANDING_SCHEMA:
context.updateVariable('JV_LANDING_SCHEMA', 'LND')
#JV_STAGING_SCHEMA is the name of the Staging Schema which the job expects and defaults to 'STG'.
if not JV_STAGING_SCHEMA:
context.updateVariable('JV_STAGING_SCHEMA', 'STG')
print('Landing Schema - '+JV_LANDING_SCHEMA)
print('Landing Table - '+JV_LANDING_TBL)
print('Staging Schema - '+JV_STAGING_SCHEMA)
print('Staging Table - '+JV_STAGING_TBL)
This component can then be followed by a Snowflake SQL component, which can have the following code –Â
INSERT INTO ${JV_LANDING_SCHEMA}.${JV_LANDING_TBL}
SELECT * FROM ${JV_STAGING_SCHEMA}.${JV_STAGING_TBL}
Here is what the outline of the pipeline looks like.
2. Update Grid Variables for Dynamic Loading of Files From S3 to Snowflake (set_s3_to_sfk_load_grid_var.py)
Purpose: This script enables us to read a list of folders in S3 containing files to be loaded into Snowflake and select the applicable folders only where necessary.
Benefits: This allows metadata to be prepared for iterative loading (sequential/parallel) of S3 files into Snowflake using the Grid iterator component of Matillion.
import boto3
# Initialize the S3 client
s3 = boto3.client('s3')
# Specify your bucket name
bucket_name = EV_CALL_LOGS_BKT # environment variable storing the bucket name
# List to store the keys of the objects
s3_Details = []
# Use the list_objects_v2 method to get a list of objects
response = s3.list_objects_v2(Bucket=bucket_name)
# Check if the bucket contains any objects
if 'Contents' in response:
# Iterate over the objects and store the keys
for obj in response['Contents']:
# Only pick those objects which -a) are folders b) is not the base transcript folder
if obj['Key'].endswith("/") and obj['Key'] != 'transcripts/':
# for those objects,create if list of lists each list having the bucket name, the key as the input folder,
#the processed folder and the rejected folder name
s3_Details.append([bucket_name,obj['Key'], obj['Key']+ 'processed/', obj['Key']+'rejected/'])
#update the grid variable with these s3 details so that we can use grid iterator to process the files iteratively.
context.updateGridVariable('S3_LOCATION_DETAILS', s3_Details)
print(context.getGridVariable('S3_LOCATION_DETAILS'))
It is then combined with a grid iterator to iterate the SQL queries to perform the necessary ELT, as shown below.
3. Implement Look Back Logic on Dates (lookback_dates.py)
Purpose: This script allows us to reprocess missing/failed runs based on the dates. We will show how to implement this for loading missing/failed s3 files to Snowflake, but the same can be extended in calculating the lookback dates for other data sources.
Benefits: This results in an ingestion pipeline’s idempotency and maintains the data loading process’s integrity.
Python code example: We will include this lookback logic in the script for the last example, #2, as below.
import boto3
from datetime import datetime, timedelta
# Get current date
current_date = datetime.now()
# Define lookback limit (For Eg. 3 days)
lookback_limit = 3
# Generate list of dates
lookback_date_list = [(current_date - timedelta(days=i)).strftime("%Y-%m-%d") for i in range(lookback_limit)]
# Print the list of dates
print(lookback_date_list)
# Initialize the S3 client
s3 = boto3.client('s3')
# Specify your bucket name
bucket_name = EV_CALL_LOGS_BKT # env variable storing the bucket name
# List to store the keys of the objects
s3_Details = []
# Use the list_objects_v2 method to get a list of objects
response = s3.list_objects_v2(Bucket=bucket_name)
# Check if the bucket contains any objects
if 'Contents' in response:
# Iterate over the objects and store the keys
for obj in response['Contents']:
# Only pick those objects which -a) are folders b) is not the base transcript folder
if obj['Key'].endswith("/") and obj['Key'] != 'transcripts/':
#if the folder represents the date which is in the lookback list
if list(filter(None, obj['Key'].split("/"))) [-1] in lookback_date_list:
# for those objects,create if list of lists each list having the bucket name, the key as the input folder,
#the processed folder and the rejected folder name
s3_Details.append([bucket_name,obj['Key'], obj['Key']+ 'processed/', obj['Key']+'rejected/'])
#update the grid variable with these s3 details so that we can use grid iterator to process the files iteratively.
context.updateGridVariable('S3_LOCATION_DETAILS', s3_Details)
print(context.getGridVariable('S3_LOCATION_DETAILS'))
4. Logging Matillion Job Errors in a Snowflake Table (log_job_error.py)
Purpose: This script will capture the errors in the job run and persist that log data in Snowflake. As a prerequisite, all the components that have the Export tab need to be edited to convert the Message and Component into job variables, for example,
JV_ERROR_MSG
andJV_ERROR_COMPONENT
, respectively. These values will then be written to a Snowflake table.
Benefits: This length of Matillion error logs persists jobs inside Snowflake, giving users detailed insight into the errors, including historical runs, without accessing the Matillion backend RDS.
if JV_ERROR_MSG:
print(JV_ERROR_MSG)
cursor = context.cursor()
cursor.execute("INSERT INTO DEMO.PUBLIC.MATILLION_ERROR_LOGS SELECT '"+str(task_id)+"' , '"+job_name+"','"+JV_ERROR_COMPONENT+"','"+JV_ERROR_MSG+"', CURRENT_TIMESTAMP::TIMESTAMP_NTZ")
rowcount = cursor.fetchone()[0]
print(rowcount)
This is how the error logs would look in Snowflake.
5. Fetch Secrets From Secret Manager (get_secret.py)
Purpose: This script will connect to AWS Secrets Manager using boto3 API and fetch the value of one of the secrets. In this example, the secret is an API key, which will be used later on in the pipeline.
Benefits: This ensures sensitive data is not exposed in the code but rather stored in secure repositories and only fetched and used during the execution of the pipeline. Thus, it strengthens the security of an organization.
import boto3
from botocorFromceptions import ClientError
import json
def get_secret(secret_name):
region_name = "us-east-1"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return secret
# For this Example, the name of our secret in AWS Secret Manageris Environment name + '/dbt' i.e dev/dbt
API_KEY=json.loads(get_secret(EV_DOMAIN.lower()+'/dbt'))['dbtpersonal']
6. Trigger External API (trigger_dbt_job.py)
Purpose: This script will show how to trigger a dbt job in DBT cloud from Matillion. Although Matillion has a Run DBT Command component, it’s only applicable for the dbt core version installed in the Matillion Instance, and it cannot communicate with DBT cloud.
Benefits: This will allow Matillion to communicate with any external services/applications triggered via an API, expanding Matillion’s integration with diverse services, such as DBT Cloud.
import boto3
from botocore.exceptions import ClientError
import json
import requests
def get_secret(secret_name):
region_name = "us-east-1"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return secret
# DBT Cloud configuration
ACCOUNT_ID = JV_DBT_ACCOUNT_ID # Job variable having your DBT Cloud account ID
JOB_ID = JV_DBT_JOB_ID # Job variable having your DBT Cloud job ID
# For this Example, the name of our secret in AWS Secret Manageris Environment name + '/dbt' i.e dev/dbt
API_KEY=json.loads(get_secret(EV_DOMAIN.lower()+'/dbt'))['dbtpersonal']
# DBT Cloud API URL to trigger a job
url = f"https://cloud.getdbt.com/api/v2/accounts/{ACCOUNT_ID}/jobs/{JOB_ID}/run/"
# Headers with authorization token
headers = {
"Authorization": f"Token {API_KEY}",
"Content-Type": "application/json"
}
# Optional payload to configure the job run (you can add parameters if necessary)
payload = {
"cause": "Triggered by API call from Matillion" # You can specify a custom message here
}
# Make a POST request to trigger the job
response = requests.post(url, headers=headers, data=json.dumps(payload))
# Check the response
if response.status_code == 200:
print("DBT Cloud job triggered successfully.")
print("Response:", response.json())
else:
print(f"Failed to trigger the job. Status Code: {response.status_code}")
print("Response:", response.json())
7. Implement S3 File Trigger-based Handshake Between Upstream and Matillion (check_file_trigger.py)
Purpose: This script checks the presence of a file in S3. If the file is present, it will exit successfully. If not, it will retry after a certain duration (E.g., 5 minutes) and fail after a certain duration (e.g., 30 minutes). This is a Custom Filewatcher using Python and AWS S3.
Benefits: This allows upstream jobs to execute asynchronously and supports event-driven triggering of Matillion jobs from upstream.
import boto3
import time
import sys
# Initialize S3 client
s3 = boto3.client('s3')
# Inout Parameters
bucket_name = EV_CALL_LOGS_BKT # Env variable containing the bucketname
file_key = 'triggers/start_job.txt'
polling_interval = 5 * 60 # 5 minutes in seconds
polling_duration = 30 * 60 # 30 minutes in seconds
elapsed_time = 0 # To track the elapsed time
def check_file_exists(bucket, key):
try:
# Check if file exists
s3.head_object(Bucket=bucket, Key=key)
return True
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
return False
else:
# Raise other exceptions if encountered
raise e
# Main logic
while elapsed_time < polling_duration:
if check_file_exists(bucket_name, file_key):
print("File "+file_key+" found in bucket "+bucket_name+". Exiting with success.")
sys.exit(0) # Success
time.sleep(polling_interval)
elapsed_time += polling_interval
# If the loop exits, it means file wasn't found within 30 minutes
print("File "+file_key+" not found in bucket "+bucket_name+" after 30 minutes. Exiting with error.")
sys.exit(1) # Error
This is how we can implement the pipeline in Matillion.
8. Sending Slack Notifications (send_slack_notification.py)
Purpose: This script will send a notification message to any Slack Channel from an existing Slack App.
Benefits: This is useful for sending notifications to Slack at the end of the Matillion Job to notify about the state of the job.
Python code example: Note that since the Slack URL contains the secret keys to the Slack account, it must be stored securely in a Secret repository and fetched during execution. We will leverage the same logic in our earlier example(
get_secret.py
 ) for this.
import requests
import json
import boto3
from botocore.exceptions import ClientError
import requests
def get_secret(secret_name):
region_name = "us-east-1"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return secret
# Slack Webhook URL (replace with your actual Slack Webhook URL)
# Function to send Slack notification
def send_slack_notification(message: str, channel: str = '#de-notifications', username: str = 'DE Automated Notifications',slack_webhook_url):
payload = {
"channel": channel,
"username": username,
"text": message
}
# Send the POST request to the Slack Webhook URL
response = requests.post(slack_webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
# Check for the response status
if response.status_code == 200:
print("Message sent successfully to "+channel)
else:
print("Failed to send message. Response: "+str(response.status_code)+", "+str(response.text))
slack_webhook_url = json.loads(get_secret(EV_DOMAIN.lower()+'/slack_url'))['url']
message = "The Matillion Pipeline - 01_load_s3_to_Snowflake has failed . Please check the logs and fix the issue"
# Send the notification
send_slack_notification(message, channel='#de-notifications', username='DE Automated Notifications',slack_webhook_url)
This can then be integrated with the main pipeline and also with the previous example #7, as shown below.
9. Call The External Database Stored Procedure (cleanup_s3_files.py)
Purpose: This script can invoke a stored procedure or any query in an external database, e.g., Oracle. The procedure loads a file into the database from S3, a copy of the processed data in the Snowflake.
However, for this connectivity to work, the Oracle driver must be downloaded and kept in the usr/share/java/ folder in the Matillion instance, along with other security configurations required for connecting to the database.Â
Benefits: This allows any reverse ETL activity, such as loading processed data back to the application database without overloading the Matillion instance.
Python code example: Note that thistle interpreters must be Jython, which supports JDBC connectivity to external databases via the java.sql package.
import java.sql.DriverManager as DriverManager
import java.sql.Connection as Connection
import java.sql.SQLException as SQLException
# Oracle Database connection parameters
oracle_jdbc_url = "jdbc:oracle:thin:@//<"+JV_HOST+">:<1521>/<"+JV_SERVICE_NAME+">"
oracle_user = "<"+JV_USERNAME+">"
oracle_password = "<"+JV_PASSWORD+">"
# Update the password into the job variable JV_PASSWORD in a Python component preceding this component using the example #5
bucket_name = EV_CALL_LOGS_BKT # env variable storing the bucket name
file_key = "transcripts/2024-09-05/processed/call110312587.csv"
try:
# Load the Oracle JDBC driver
# make sure the driver is present in the Matillion instance
DriverManager.registerDriver(oracle.jdbc.driver.OracleDriver())
# Establish the connection
connection = DriverManager.getConnection(oracle_jdbc_url, oracle_user, oracle_password)
print("Connection to Oracle database established successfully!")
query = "{call DEMO_DB.PUBLIC.LOAD_DATA_FROM_s3_to_DB(?, ?)}"
stmt= conn.prepareCall(query)
stmt.setString(bucket_name, file_key)
stmt.execute()
# Close the connection
connection.close()
except SQLException as e:
print("Error occurred while processing in Oracle DB: ", e)
10. Trigger AWS Iambda Asynchronously (trigger_lambda.py)
Purpose: This script will allow us to trigger an AWS Lambda function asynchronously, i.e., the execution will end at the Matillion end and completely run at the AWS end. The Lambda’s purpose here is to read a chunk of the S3 file to determine the file’s encoding as we receive the input files with different encodings. Then, using the S3 file trigger-based handshake explained in example #8, we can trigger the Matillion pipelines to load the data to Snowflake based on the encoding identified.
Benefits: This allows an in-memory read operation using an AWS Lambda outside the Matillion instance, thereby reducing the load on the instance.
import boto3
import json
# Initialize boto3 Lambda client
lambda_client = boto3.client('lambda', region_name='us-east-1')
# Function to invoke AWS Lambda asynchronously
def trigger_lambda_async(lambda_function_name, payload):
try:
response = lambda_client.invoke(
FunctionName=lambda_function_name,
InvocationType='Event', # This ensures async invocation
Payload=json.dumps(payload),
)
# Check response for any issues
if response['StatusCode'] == 202:
print("Lambda function "+lambda_function_name+" invoked successfully.")
else:
print("Failed to invoke Lambda function. StatusCode: "+str(response['StatusCode']))
except Exception as e:
print("Error invoking Lambda: "+str(e))
# Replace with your Lambda function name
lambda_function_name = 'arn:aws:lambda:us-east-1::function:CheckFileEncoding'
# Payload to send to the Lambda function
payload = {
"s3_bucket": "call-center-logs-dev",
"s3_file": "transcripts/2024-09-05/call110312587.txt"
}
# Trigger Lambda asynchronously
trigger_lambda_async(lambda_function_name, payload)
Best Practices
Along with following the general best practices of developing pipeline in Matillion, we should especially keep the below points in consideration while using Python inside Matillion;
Do not use the Python Component for memory-intensive processing, such as running PySpark code and high-memory algorithms, since Matillion runs on servers rather than a Spark Cluster. Such large data processing tasks should be pushed down to Snowflake, where we can first ingest the required data via Matillion components for the required source. After this, we can use the Matillion SQL component to run the memory processing logic as SQL or Stored procedure in Snowflake.
Use the latest version of Python wherever possible.
Use Jython as an interpreter only when JDBC connectivity to an external service is required.
If a feature can be achieved using built-in Matillion components, we should prioritize it over doing it via Python.
Closing
This blog has covered different scenarios of leveraging Python to extend Matillion’s capabilities. We can implement Python code in Matillion in many other use cases. You can start writing similar Python code in your Matillion pipeline and achieve your end result. Happy Coding!
If you have any additional questions or need assistance with implementing Python in Matillion, please contact our team at phData.
FAQs
What language does Matillion use?
Matillion supports writing code in Python, Bash Script, and native ANSI SQL commands.