StreamSets Test Framework (STF) is a set of Python tools and libraries that enables developers to write integration tests for StreamSets:
- Data Collector
- Control Hub
- Data Protector
- Transformer
This unique test framework allows you to script tests for pipeline-level functionality, pipeline upgrades, functionality of individual stages, and much more according to the requirements.
But the best part is that STF is available for free to StreamSets Premium and Enterprise customers! All it takes is a little work up front and you’ll be able to write test cases for any of your pipelines.
This hands-on blog covers the StreamSets Test Framework installation process, how to work within the STF, and how to write tests for pipelines in a registered Data Collector or Transformer within StreamSets Control Hub.
Let’s dive in and get your test framework built!
How to Install StreamSets Test Framework
Getting started with STF installation requires a few steps and prerequisites, which we’ve summed up below.
Installation Requirements
- DockerÂ
- Python 3.6 (StreamSets recommends 3.6 version as some users have reported some errors with Python 3.7)
- Activation key — to use STF, you will need to request a StreamSets SDK activation key
Installation Steps and Verification
The installation of STF can be done using pip3 on the host machine.
pip3 install streamsets-testframework
Run the below command to confirm the installation and its version.
stf --version (shows the installed version)
Docker Images
stf build extras
If this throws the error below (the current version at the time of writing [1.1.0] needs a fix) I’ve included a command that will fix it.
Build Error Example
Build Error Command Fix
stf build --build-arg DATABRICKS_JDBC_DRIVER_URL=https://databricks.com/wp-content/uploads/drivers-2020/SimbaSparkJDBC42-2.6.17.1021.zip extras --extra-library databricks
STF Usage
STF is built on top of the StreamSets SDK for Python and uses pytest as its underlying test executor. This includes a collection of client-facing APIs to facilitate interaction with external environments.Â
The below command gives more details on usage:
stf -h
Be sure to check out the STF Documentation for the latest information and the StreamSets SDK for Python if you’re looking for a basic understanding and for advanced usage information on STF.
What is an STF Shell?
An STF Shell is used to get an interactive shell within the test framework environment and is particularly useful during the test development process, where a user may want to explore the streamsets.testframework
package from an interpreter.
Run stf shell and then enter into the python interpreter by running python command.
Connecting to Data Collector and Transformer from ControlHub
The code below works for registered Data Collector and Transformer instances within StreamSets Control Hub.
Here your Control Hub credentials need to be used to instantiate an instance of streamsets.sdk.ControlHub
before it’s passed as an argument to streamsets.sdk.DataCollector
 or streamsets.sdk.Transformer
as shown below:
Note: You’ll need to replace the argument values according to your setup
from streamsets.sdk import ControlHub
sch = ControlHub('https://cloud.streamsets.com',username='',password=)
from streamsets.sdk import Transformer
Transformer.VERIFY_SSL_CERTIFICATES = False #(by default set to true, again change it depending on your settings)
st = Transformer(server_url = '', control_hub = sch)
from streamsets.sdk import DataCollector
data_collector.VERIFY_SSL_CERTIFICATES = False #(by default set to true, again change it depending on your settings)
data_collector = DataCollector(')
Accessing the Data Collector or Transformer Pipelines and its Stages From Control Hub
pipeline1 = sch.pipelines.get(name='')
pipeline1.stages
origin = pipeline1.stages[0]
destination = pipeline1.stages[1]
Another way to access these pipelines is directly through Transformer or Data Collector:
pipeline2 = st.get_pipeline(pipeline_id='')
pipeline2.stages
pipeline3 = data_collector.get_pipeline(pipeline_id='')
pipeline3.stages
We can inspect in the Python interpreter using the built-in dir()
function or by using Python’s built-in help()
function on an instance of the class. Few examples are:
help(Transformer)
dir(Transformer)
help(origin)
Then with the attribute name in hand, you can read the value of the configuration. In the above example: origin.data_format
Connecting to Test Environments
Now that we have defined how to connect to the Data Collector and how the STF shell functions, we have everything in place to begin executing our tests against various platforms. The following sections provide a hint at the different options available and the remainder of this piece will focus on testing with Databricks.
Further integrations can be found using StreamSets provided API’s that can connect to various sources like HDFS, Azure, AWS, Spark, Kafka, databases etc.
Databricks and Listing Files
Currently, documentation is not publicly available on how to connect to Databricks.
As many clients are using Databricks, here is the information on how to connect from STF and work with it.
import os
os.environ['DATABRICKS_TOKEN'] =''
from streamsets.testframework.environments.databricks import DatabricksDBFS,DatabricksInstance
db=DatabricksInstance('',cluster_id='')
db.fs.list('')
Cloudera and Listing Files
Similarly, clients that are using Cloudera Spark will have a similar connection criteria, here is the information on how to connect from STF and work with it.
import os
os.environ['CLOUDERA_CONFIGS'] =''
from streamsets.testframework.environments.cloudera import ClouderaSpark2OnYarn,ClouderaHdfs
cldr=ClouderaHdfs('','')
cldr.fs.list('')
How to Build Your Own Docker Image to Use With STF Tests
To import extra modules (like pyarrow or Pandas), you can build a Docker image with that module in place. To do that, create an empty folder, set it as your working directory and add a file called Dockerfile
 which looks like this:
FROM streamsets/testframework:latest
RUN pip install pyarrow
RUN pip install pandas
docker build -t streamsets/testframework: .
To use your image when running STF, add this:
stf --docker-image-dont-pull --docker-image streamsets/testframework:
STF Test
STF tests are executed by cd
-ing into a directory containing the test source and then running the STF test command. Any arguments after the test are passed into the Test Framework container as arguments to an invocation of the pytest command.
Listed below are three common examples of the STF Test:
Test Your Metric Count
The first example runs a test case on metric count by creating a job then starting, running, stopping, and deleting it in Control Hub for the assigned pipeline in your Data Collector or Transformer instance.
Prerequisites: Requires name of any working pipeline.
Actual code goes here <anyfilename.py>
import pytest
Import pytest
from streamsets.sdk import ControlHub
def pytest_addoption(parser):
parser.addoption('--pipeline-name')
@pytest.fixture(scope='session')
def sch(sch_session):
yield sch_session
@pytest.fixture(scope='session')
def pipeline(sch_session, request):
pipeline_name = request.config.getoption('pipeline_name')
pipeline_ = sch_session.pipelines.get(name=pipeline_name)
yield pipeline_
def test_run_job(sch,pipeline):
#Assertion on metric count by creating a job and running it.
job_builder = sch.get_job_builder()
job = job_builder.build(job_name='stf_test', pipeline=pipeline)
job.data_collector_labels=['dev'] #replace with your instance label names to which you want to connect
try:
sch.add_job(job)
sch.start_job(job)
current_status = sch.get_current_job_status(job).response.json().get('status')
assert current_status == 'ACTIVE'
job_metrics = sch.jobs.get(job_name="stf_test")
output = job_metrics.metrics(metric_type='RECORD_COUNT', include_error_count=True).output_count
assert output is not None
finally:
sch.stop_job(job)
sch.delete_job(job)
stf test
--sch-server-url 'https://cloud.streamsets.com/'
--sch-username '' --sch-password ‘’
--pipeline-name
Test a Pipeline in Transformer or Data Collector
This test previews the pipelines and retrieves the output values from the stage for assertions.
Prerequisites: Requires a pipeline that already exists in your Transformer or Data Collector instances and it has the origin dev raw data source
and destination trash
. The dev raw data source
includes JSON data_format
and contains the following data:
[
{
"name": "Apple",
"age": 30,
"salary": 1000
},
{
"name": "Mac",
"age": 25,
"salary": 1500
}
]
We can define the conftest.py
file fixture functions in the same directory where you save your tests. This makes them accessible across all the test files in that same directory.
For that to work, you’ll need to create a new conftest.py
file in the same directory where you have tests and add the following code to it:
import pytest
def pytest_addoption(parser):
parser.addoption('--pipeline-name')
@pytest.fixture(scope='session')
def sch(sch_session):
yield sch_session
@pytest.fixture(scope='session')
def pipeline(sch_session, request):
pipeline_name = request.config.getoption('pipeline_name')
pipeline_ = sch_session.pipelines.get(name=pipeline_name)
yield pipeline_
Place the following code in :
def test_preview_and_validation(sch, pipeline):
'''Preview of pipeline with events.
Ensure that there are no validation or other issues.'''
pipeline.configuration['shouldRetry'] = False
preview = sch.run_pipeline_preview(pipeline).preview
assert preview is not None
assert preview.issues.issues_count is None
'''Assertion on origin data from preview'''
origin=pipeline.stages[0]
data = preview[origin.instance_name].output
keys = data[1].field.keys()
assert len(data) == 2
assert data[0].field['name'] == 'Apple'
Don’t forget to replace argument values with your details to run these tests!
stf test
--sch-server-url 'https://cloud.streamsets.com/'
--sch-username '' --sch-password ‘’
--pipeline-name
Test your Connection and Compare Schema
In this example, let’s connect to Databricks, access files, and compare schema of existing files in Databricks with pipelines that have the origin stage schema.
Prerequisites: Requires a pipeline that already exists in the Transformer or Data Collector instances that has the origin dev raw data source
and Delta lake
as the destination. The dev raw data source
includes JSON data_format
and contains the data below (like in our first example):
[
{
"name": "Apple",
"age": 30,
"salary": 1000
},
{
"name": "Mac",
"age": 25,
"salary": 1500
}
]
import pytest
import os
import pyarrow as pa
import pyarrow.parquet as pq
from streamsets.sdk import ControlHub
from streamsets.testframework.environments.databricks import DatabricksInstance
from streamsets.testframework.markers import cluster
os.environ['DATABRICKS_TOKEN'] =''
@cluster('databricks')
def test_databricks_compare_schema(databricks_cluster, sch, pipeline):
pipeline.configuration['shouldRetry'] = False
#Runs pipeline preview
preview = sch.run_pipeline_preview(pipeline).preview
origin=pipeline.stages[0]
destination=pipeline.stages[1]
prev_data = preview[origin.instance_name].output
origin_schema = list(prev_data[1].field.keys())
db_fs = databricks_cluster.fs
path = destination.configuration['conf.deltaLakeTable']
#checking if files exists
list_files = db_fs.list_files(path)
if len(list_files) > 0:
file = list_files[0]
file_data = db_fs.read_raw(file)
reader = pa.BufferReader(file_data)
table = pq.read_table(reader)
df = table.to_pandas()
dest_schema = df.columns.tolist()
# assertion on schema comparison between origin and already existing files in databricks.
assert origin_schema == dest_schema’
print(’---Schema Matched---’)
@pytest.fixture(scope='function')
def databricks_cluster(cluster):
yield cluster
stf --docker-image-dont-pull --docker-image streamsets/testframework: test --databricks-instance-url --sch-server-url 'https://cloud.streamsets.com/' --sch-username '' --sch-password '' --pipeline-name -vs
Our sincere hope is that you have a better understanding of how to build and operate within the StreamSets Test Framework.Â