Consider a data pipeline that detects its own failures, diagnoses the issue, and recommends the fix—all automatically. This is the potential of self-healing pipelines, and this blog explores how to implement them using dbt, Snowflake Cortex, and GitHub Actions.Â
By automating the debugging process in dbt pipelines, developers can save time, reduce errors, and enhance the reliability of their data workflows.
Solution Overview
While considering how we could leverage GenAI for self-healing capabilities, I thought it would be interesting to experiment with applying GenAI to a dbt pipeline. My idea was to run a dbt transformation pipeline, identify any failed models or tests, and then use GenAI to diagnose the error and suggest a fix.Â
The AI’s feedback would then be pushed to a pull request to kickstart the debugging process, leading to a workflow like this:
With this rough concept in mind, I needed to decide which GenAI model to use. Snowflake Cortex stood out as the ideal choice for powering the model due to its direct access to data, intuitive functionality, and exceptional performance in handling SQL tasks.Â
To supply the necessary context for the failed tests and models, I planned to query dbt Cloud’s Discovery API to retrieve the artifacts related to those failures. With this information, I could then craft a prompt for Cortex, enabling it to generate feedback for the pull request.
Introduce Intentional Errors
To start testing this process, we needed to intentionally introduce errors into the dbt pipeline. I used a demo project that I frequently work with and introduced syntax errors and data quality problems.Â
Test Case 1 - Introduce Contract Breaking Column and Data Duplication
First, I removed a WHERE
clause in a model that contained a large number of null key values, and then I added a column that would break the model’s contract.
SELECT DISTINCT
pit.customer_hk AS customer_pk,
cst.customer_name,
cst.customer_last_name,
cst.customer_first_name,
cst.sales_rep_employee_number,
cst.credit_limit,
phn.phone,
addr.address_line1,
addr.address_line2,
addr.city,
addr.state,
addr.postal_code,
addr.country,
'1' AS contract_break
FROM src_pit_customer pit
LEFT JOIN src_customer cst
ON pit.sat_customer_details_pk = cst.customer_hk AND
pit.sat_customer_details_ldts = cst.load_date
LEFT JOIN src_customer_phone phn
ON pit.sat_customer_phone_details_pk = phn.customer_hk AND
pit.sat_customer_phone_details_ldts = phn.load_date
LEFT JOIN src_customer_address addr
ON pit.sat_customer_address_details_pk = addr.customer_hk AND
pit.sat_customer_address_details_ldts = addr.load_date
In dbt, this model should lead to one failure even though there are two problems: dbt contract tests run before a model can be executed, which results in only a contract issue.Â
Test Case 2 - Introduce Data Quality Issue with Duplicates
The next issue I wanted to introduce was intentionally duplicating the date using a UNION ALL
, resulting in a model like this:
SELECT
order_hk AS order_pk,
required_date,
shipped_date,
status,
comments
FROM src_order_details
UNION ALL
SELECT
order_hk AS order_pk,
required_date,
shipped_date,
status,
comments
FROM src_order_details
Because of the UNION ALL
statement, this model will contain duplicate data. While it won’t violate any contract and will run without errors, it will fail the uniqueness tests.Â
Test Case 3 - Introduce SQL Syntax Error
The final model was created with a WHERE
clause that contains no logic, causing it to fail when executed in the data warehouse. The resulting SQL looks like this:
SELECT
product_hk AS product_pk,
product_name,
product_line,
product_scale,
product_vendor,
product_description
FROM src_products
WHERE
Verify Errors
These three errors seem like suitable test cases, and when executed in dbt, we observe the following:
Create CI/CD Pipeline
With errors and issues now present in our codebase, it’s time to develop the Python code to manage the CI/CD process. This code will handle executing our dbt pipeline, retrieving the necessary artifacts, sending prompts to Snowflake Cortex, and then writing the results to the pull request.
Execute dbt Cloud Job
The first step is to execute the dbt Cloud job and monitor its completion. This involves using the dbt Cloud API endpoints to start the job and then polling its status. We need to do this so we can use the generated run_id
in the subsequent step when querying the dbt Cloud Discovery API. Here’s how this process looks:
def run_and_poll() -> [int, str, str]:
print('Beginning request for job run...')
# run job
run_id: int = None
try:
run_id = run_job(cred.req_job_url, cred.req_auth_header, cred.job_cause, cred.git_branch, cred.schema_override)
except Exception as e:
print(f'ERROR! - Could not trigger job:\n {e}')
raise
# build status check url and run status link
req_status_url = f'https://{cred.api_base}/api/v2/accounts/{cred.account_id}/runs/{run_id}/'
run_status_link = f'https://{cred.api_base}/#/accounts/{cred.account_id}/projects/{cred.project_id}/runs/{run_id}/'
# update user with status link
print(f'Job running! See job status at {run_status_link}')
# check status indefinitely with an initial wait period
time.sleep(30)
while True:
status = get_run_status(req_status_url, cred.req_auth_header)
print(f'Run status -> {status}')
if status in ['Error', 'Cancelled', 'Success']:
return run_id, status, run_status_link
time.sleep(10)
For brevity, I haven’t included all the helper functions (though this is a streamlined version of the code we shared at dbt Coalesce in 2023, which you can find here). At its core, this is the entry point where the code interacts with the dbt Cloud API to start the dbt Cloud Job and then polls for its status, returning the information to the CI/CD pipeline.
Fetch Artifacts from dbt Cloud Discovery API
Context: GraphQL is a powerful query language for APIs that allows you to request exactly the data you need, no more and no less. Unlike traditional REST APIs, where multiple endpoints might be required to fetch related data, GraphQL enables you to retrieve all the necessary information in a single request. This not only makes your API calls more efficient but also provides greater flexibility and control over the data you work with, making it an essential tool for modern web development.
With the run status in hand, the next step is to query the GraphQL API for job details and generate a result set that will be useful for the Snowflake Cortex prompt. We’ll start by crafting a GraphQL query, which essentially involves creating a JSON query payload and a set of variables.Â
This payload will be sent to the API using the requests library, resulting in a query like:
def graphql_query(run_id) -> [str, str]:
query = f'''
query ExampleQuery($jobId: BigInt!, $runId: BigInt) {{
job(id: $jobId, runId: $runId) {{
models {{
description
error
executionTime
compiledSql
rawSql
status
tests {{
columnName
error
fail
name
skip
status
}}
}}
}}
}}'''
variables = {
"jobId": cred.job_id,
"runId": run_id,
}
return query, variables
This function generates the query and the variables used for substitutions to send to the API. These are posted to the GraphQL endpoint of your dbt Cloud instance, returning the models and tests executed for that instance.Â
Once we have the results, we need to construct a dictionary that represents the context of our model. For this experiment, I used a dictionary formatted as follows:
dict_model = {
"description": model['description'],
"error": model['error'],
"executionTime": model['executionTime'],
"compiledSql": model['compiledSql'],
"rawSql": model['rawSql'],
"status": 'failed',
"tests": failed_tests
}
This dictionary is then added to a list, which is used to create the prompt sent to Snowflake Cortex.
Push Prompt to Snowflake Cortex
With our list of errored models and the dictionary providing additional context for Snowflake Cortex, the next step is to craft a prompt for handling the error messages. The prompt is designed to instruct the LLM to approach the problem as if it were an analytics engineer and to summarize the data from the dictionary as feedback for a pull request. This leads to a prompt like:
final_question = " ".join([
f"As an analytics engineer you were asked this question \"{original_question}\"."
, f"Please summarize the data below for that question in the format of feedback for a pull request."
, f"Do NOT return a table as part of your answer."
, f"\n\n Data:\n{error_result}"
])
In this prompt, our original_question
can be either of two questions depending on the type of error or failure: “What is wrong with the model below?” or “Why did the model below fail the data tests?” The specific question is chosen based on the type of error, while the error_result
is the dictionary previously extracted from the dbt Cloud Discovery API. This message is then sent to Snowflake using a query like:
def query_cortex_complete(question, model):
# replace single quotes with double single quotes to passthrough to snowflake
question = question.replace("'","''")
sql = f"SELECT SNOWFLAKE.CORTEX.COMPLETE('{model}', '{question}');"
res = session.sql(sql)
query_results = res.collect()
str_results = list(query_results[0].asDict().values())[0]
return str_results
With these results in hand, it’s time to send the responses back to the pull request.
Comment Results to GitHub
Fortunately, commenting on GitHub is the simplest part of this process. By making a straightforward API request, we can easily post the message we want to display on GitHub using a simple requests call like:
def comment_on_pull_request(msg):
response = requests.post(cred.gh_url, headers=cred.gh_header, json={"body": msg})
time.sleep(10)
As you can see, this step quickly posts the comment and then pauses briefly to avoid spamming the API or triggering rate limits. The next step is to consolidate all these processes into a central script and implement it as a GitHub Action.
GitHub Action to Execute Pipeline
With all the functions encapsulated into manageable pieces, we can now create a Python script to execute these steps, which can then be invoked from the GitHub Action pipeline.
from utils.dbt_cloud import execute_job
from utils.dbt_cloud import graphql_query
from utils.snowflake import cortex
def main():
# execute dbt job and poll for status
run_id, status, run_status_link = execute_job.run_and_poll()
# determine status
if status == 'Canceled':
raise Exception(f'Run canceled. See why at {run_status_link}')
if status == 'Success':
print(f'Job completed successfully! See details at {run_status_link}')
return
# retrieve errors and test failures
error_results = graphql_query.graphql_targeted_response(run_id)
# question results with cortex
cortex.question_errors(error_results)
if __name__ == "__main__":
main()
In this script, you may notice there’s no separate function for posting the comment. This is because, in my experiment, the comment was pushed directly within the Cortex function as it processed a result.Â
Now that the core components for a self-healing pipeline are in place, the next step is to automate the entire process by defining a GitHub Action Workflow. GitHub Action Workflows streamline this complex operation, orchestrating the execution and integration of each step, ensuring that the pipeline runs automatically. Below you’ll find an example workflow yml file to execute the above scripts:
name: dbt CI Self-Heal
on:
pull_request:
branches:
- '*'
jobs:
run_dbt_on_pr:
name: dbt CI with Self-Heal
runs-on: ubuntu-latest
env:
... removed env variables for brevity
steps:
- uses: actions/checkout@v3
with:
repository: ${{ github.event.pull_request.head.repo.full_name }}
ref: ${{ github.event.pull_request.head.ref }}
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r ./python/requirements.txt
- name: Trigger dbt job
run: |
python -u ./python/run_monitor_and_self_heal.py
F
Results from Cortex
So, how effectively did the prompt address the errors introduced in the pipeline? Let’s review the responses from Cortex and take a look at how the first test case worked.
Test Case 1 - Results
Based on the provided error message and the given SQL code, there seems to be an issue with the dim_customers
model. The error specifically mentions that there is a missing column CONTRACT_BREAK
in the contract for the model.
Looking at the SQL code, it appears that CONTRACT_BREAK
is hardcoded as a constant value ‘1’ in the final SELECT
statement. However, it seems that this column is expected to be part of the contract for the model.
To resolve this issue, you need to include CONTRACT_BREAK
as a column in the contract for the dim_customers
model. This can be done by updating the contract definition to include this column and ensuring that the name, data type, and number of columns in the contract match the columns in the model’s definition.
Once the contract is updated, the model should compile without any errors.
Cortex correctly identified that we introduced a column that broke the dbt contract. It recommends updating the contract and checking for any additional errors. Given how dbt operates, this is the only error reported for this particular model, and the feedback seems valid if we intend to modify the contract. What about the next model?
Test Case 2 - Results
Based on the provided data, the model for orders failed the data tests due to the following issues:Primary Key Constraint Violation: The tests dbt_constraints_primary_key_dim_orders_order_pk
and dbt_constraints_unique_key_dim_orders_order_pk
both failed with the error message “Got 326 results, configured to fail if != 0”. This indicates that there are multiple records with the same order_pk
(order_hk
) value in the model, violating the primary key constraint.Data Integrity Issue: The model is built using a UNION
ALL statement to combine data from the src_order_details
table twice. This could lead to duplicate records and incorrect results.To address these issues, you should:Enforce Data Integrity: Ensure that each order_pk
(order_hk
) value is unique in the model to satisfy the primary key constraint. You can do this by adding a unique constraint on the order_pk
column in the model or the underlying table.
Review the UNION
ALL Statement: Verify if it is necessary to use UNION
ALL in this model. If it is, ensure that the data from the src_order_details
table is correctly combined and does not contain duplicate records.
By addressing these issues, you can improve the accuracy and reliability of the orders model and ensure that it passes the data tests.
This model included a UNION ALL
statement, which inevitably caused data duplication and related issues. The comment correctly identified the potential data integrity problem caused by the UNION ALL
statement and offered what seems to be valid feedback to address the issue. What about our third and final model?
Test Case 3 - Results
Based on the provided data, here are the key points and observations:
The error message indicates a syntax error in the dim_products
dimensional model’s SQL code, specifically on line 23 at position 8. The error is related to an unexpected ‘)’ character.
The model is written in a way that it references a table named sat_product_details
using the {{ ref(‘sat_product_details
‘) }} syntax, which suggests that this model is part of a data warehouse or ETL pipeline.
The model is designed to extract data from the sat_product_details
table and populate the following attributes in the src_products
CTE (Common Table Expression):product_hkproduct_nameproduct_lineproduct_scaleproduct_vendorproduct_descriptionThe
final SELECT
statement retrieves the same attributes from the src_products
CTE and assigns the product_hk
column as the primary key for the dim_products
model.
The execution time of the model is 0.3148694038391113 seconds, which is relatively fast.
The model’s status is reported as ‘error’, indicating that the model failed to run due to the aforementioned syntax error.In summary, the dim_products
model is designed to extract and process product data from the sat_product_details
table, but it contains a syntax error that needs to be resolved. The model appears to be part of a larger data warehouse or ETL pipeline.
This output is less helpful. While we know the issue is an improperly formatted WHERE
clause, Cortex did not identify this. It correctly flagged a syntax error and recognized the model’s structure, including the various CTEs and columns. However, it missed the fact that the core problem was with the WHERE
clause logic.
Refactoring to Improve Our Results
These results are a decent start, however Cortex makes it extremely easy to adjust models and modify prompts to enhance our LLM application’s ability to debug the error. Using the provided model bench-marks, we can examine the various models available within Cortex, to help identify a model that can help better understand our SQL.Â
Looking over the chart, I noticed that a mistral-large model, despite having a smaller context window, scored really well on the Spider 1.0 SQL test.Â
So, let’s start by changing the model to see if this gives us three correct answers. We can accomplish this by updating the model we used from reka-flash to mistral-large.
def summarize_result(original_question, error_result):
final_question = " ".join([
f"As an analytics engineer you were asked this question \"{original_question}\"."
, f"Please summarize the data below for that question and note any interesting trends or correlations in the data."
, f"Do NOT return a table as part of your answer."
, f"\n\n Data:\n{error_result}"
])
summarized = query_cortex_complete(final_question, 'mistral-large')
return summarized
This update is extremely easy and has resulted in no change to our prompt. Let’s review the output from our models and see how it performs with mistral.
Test Case 1 - Refactored Results
The issue with the provided dimensional model for customers is a compilation error. The error message indicates that the model has an enforced contract that failed. The contract is a set of rules that ensure the name, data type, and number of columns in the model match the columns in the contract’s definition.
The specific mismatch is that the column 'CONTRACT_BREAK'
is missing in the contract. This column is present in the model’s definition, but it’s not included in the contract, hence the error.
The model is pulling data from different sources related to customer details, customer phone details, and customer address details. It’s then joining these sources on the customer key and load date to create a comprehensive customer view.
Interestingly, the model includes a column named 'contract_break'
which is set to '1'
for all records. This could be a placeholder for a flag to indicate some sort of contract breach, but without more context, it’s hard to say for sure.
In terms of trends or correlations, it’s not possible to infer any from the provided data as it’s primarily schema and SQL code, not actual data. To identify trends or correlations, we would need to analyze the data itself.
Look at that! Not only did mistral still pick up on the error that the contract was broken, it even picked up on the fact that a static column was put in place to break the contract. This is a great start for mistral-large. How well does it do with our data quality issues?
Test Case 2 - Refactored Results
Based on the data provided, the model failed the data tests due to primary key and unique key constraints. The tests named ‘dbt_constraints_primary_key_dim_orders_order_pk
‘ and ‘dbt_constraints_unique_key_dim_orders_order_pk
‘ both failed. The error message indicates that these tests were configured to fail if the number of results is not equal to 0, and in this case, both tests returned 326 results.
This suggests that there are duplicates in the ‘order_pk
‘ column, which is supposed to be unique as it is the primary key. The model is a dimensional model for orders and it’s pulling data from the ‘sat_order_details
‘ table. The SQL query is unioning all the data from this table twice, which could be the reason for the duplicates.
An interesting trend in the data is not immediately apparent due to the lack of specific data points. However, the fact that the number of failed tests due to duplicate primary keys is 326 could indicate a systemic issue in the data ingestion or processing pipeline.
Again, mistral-large is able to identify that there are data quality issues with the table, and that there is a UNION ALL
 that is most likely the culprit. Again, great context to help start troubleshooting. How about our model with the syntax error that reka-flash wasn’t able to answer?
Test Case 3 - Refactored Results
The issue with the provided model is a syntax error in the SQL code. The error message indicates an unexpected ‘)’ at line 23, position 8. However, upon examining the provided SQL code, the actual issue seems to be a missing condition in the WHERE
clause at the end of the query. The WHERE
clause is used to filter records, but in this case, it’s empty, causing a syntax error.
As for the data, it’s a dimensional model for products, pulling data from a source table ‘sat_product_details
‘. The model includes fields such as product_hk
(which is being aliased as product_pk
), product_name
, product_line
, product_scale
, product_vendor
, and product_description
.
Without a proper WHERE
clause, it’s hard to identify specific trends or correlations as the query would simply return all records from the source table. However, potential trends could be analyzed based on the product line, scale, or vendor, depending on the specific business questions at hand.
This time, not only did it still return the error message, it also picked up on the WHERE
clause missing any logic. Adding in that a proper WHERE
clause would resolve the issue, though it isn’t able to determine the context for why the WHERE
exists since there is not any logic to debug. This is significantly better compared to reka-flash.
Conclusion
Thanks to Cortex’s ease of use, we were not only able to get this feedback process working but also iterated on our initial reka-flash process to identify a model that provides better feedback. I believe there are ways to improve the prompt and even refine the model selection to enhance the process further.
This could be taken further by adding an LLM component that generates new SQL to implement the fix. We could create a workflow where, if a model fails, the LLM generates a working model and commits it to its own branch for review. This would serve as training feedback and could be a step toward achieving a truly self-healing CI/CD pipeline with dbt.Â
Fortunately, dbt offers a wealth of artifacts related to run and model execution, which can be used to add context to the LLM prompts.
In the end, GenAI is a tool, and we should use it where it makes our work easier. This might include providing automated feedback to assist junior engineers with troubleshooting, thereby improving efficiency and increasing velocity without relying too heavily on senior engineers, who are often in shorter supply.
Have questions? Looking to utilize dbt better at your organization? Interested in implementing GenAI? phData can help! Reach out to us today for answers, advice, and help.