Creating a machine learning model is a step-by-step process and typically looks like this: you fetch some data, preprocess it, train a model, and evaluate its performance. In the past, these individual steps may have made up an experimental notebook (looking at you Data Scientists 👀) or a series of Python functions haphazardly strung together. Nowadays there are tools out there (such as ZenML) that enable you to define these steps and link them together as a pipeline -- great for reproducibility and reusability, as well as writing clean and concise code.
The ability to write clean and reproducible code is great -- excellent, in fact -- but how do you know that your steps, or pipeline as a whole, are actually doing what you expect them to do? How does your pipeline deal with inputs that it hasn’t seen before; is it robust? In a traditional software engineering context answering these questions is straightforward: thorough testing, but how does this translate into MLOps? Well, you’re in luck -- I’m going to show you how.
How do you test the steps in an ML pipeline?
Let’s first start with the individual steps of a pipeline before tackling the pipeline as a whole. Now, I’ve mentioned a tool called ZenML before and it’s what I’m going to use in these examples but it’s probably worth providing a bit more information about it. In essence, it’s an ML pipelining tool which abstracts away some of the complexities around productionising ML. Crucially, it allows you to define a series of steps and link them together as a pipeline. Here at Fuzzy Labs we’re big fans of ZenML and have written and talked about it extensively -- I would encourage you to check out our other blogs which go into detail about it.
To demonstrate, I’ve created a hypothetical image classification pipeline (shown below) which uses the digit dataset and has three steps:
- Loading the data and creating the train and test splits
- Training a model
- Evaluating a model
<pre><code>from zenml import pipeline
@pipeline
def training_pipeline():
x_train, x_test, y_train, y_test = digits_data_loader(test_size=0.2)
model = svc_trainer(x_train, y_train)
test_acc = evaluator(model, x_test, y_test)</code></pre>
As is standard when using ZenML, you also have <code>run.py</code> which runs the pipeline given some configuration (see below) -- all fairly standard stuff. In the configuration, it specifies a customisable parameter for the <code>data_loader</code> step: <code>test_size</code>, which has a default value of <code>0.2</code>.
<pre><code>from pipelines import training_pipeline
def main():
pipeline = training_pipeline.with_options(config_path='pipeline_config.yaml')
pipeline()
if __name__ == '__main__':
main()
</code></pre>
If we take the first step,<code>data_loader()</code>, how do we test that? To start with we need the project to be in a particular structure. I’ve created a <code>tests</code> directory (see the repository) which contains two folders: <code>test_steps</code> and <code>test_pipelines</code>. Within that <code>test_steps</code> folder and for each step, I have a separate <code>test_step_<name>.py</code> script which contains all the test code for that particular step.
As mentioned earlier, the <code>data_loader</code> step has a single parameter associated with it -- the test set size. When testing ZenML steps, the step expects the parameters to be passed to it so I’ve created a pytest fixture in the <code>conftest.py</code> which defines this parameter (see below). You might be wondering why this fixture isn’t defined within the tests for the data loader step and this is because it’s a parameter that will be helpful later -- we need to run the data loader step when testing the model training.
<pre><code>import pytest
@pytest.fixture
def data_parameters():
params = {
'test_size': 0.2
}
return params</code></pre>
Once the above fixture is in place, we can write the tests for the step itself -- see below for how this is done.
<pre><code>import pytest
from math import ceil
from steps import digits_data_loader
EXPECTED_DATA_LENGTH = 1797
def test_correct_data_amount(data_parameters):
x_train, x_test, y_train, y_test = digits_data_loader(**data_parameters)
assert (len(x_train) + len(x_test)) == EXPECTED_DATA_LENGTH
def test_correct_split(data_parameters):
x_train, x_test, y_train, y_test = digits_data_loader(**data_parameters)
expected_size_test = ceil(EXPECTED_DATA_LENGTH * data_parameters['test_size'])
expected_size_train = floor(EXPECTED_DATA_LENGTH * (1 - data_parameters['test_size']))
assert len(x_test) == expected_size_test
assert len(y_test) == expected_size_test
assert len(x_train) == expected_size_train
assert len(y_train) == expected_size_train</code></pre>
Even though we’re annotating and defining our functions as steps or pipelines, the method in which you test them remains the same. The above should look familiar to anyone that’s used the PyTest framework before.
Now you know how to test a single ZenML step, excellent. But if we jump forward a bit to the <code>evaluator</code> step, then how do we test a step which has a dependency on the previous two steps? The easiest way to do this is by turning the steps into fixtures.
<pre><code>import pytest
from steps import digits_data_loader, svc_trainer, evaluator
@pytest.fixture
def data(data_parameters):
x_train, x_test, y_train, y_test = digits_data_loader(**data_parameters)
return x_train, x_test, y_train, y_test
@pytest.fixture
def model(data):
x_train, _, y_train, _ = data
return svc_trainer(x_train, y_train)
def test_acc_within_range(data, model):
_, x_test, _, y_test = data
score = evaluator(model, x_test, y_test)
# assert that the accuracy is 0.95 +/- 0.2
assert score == pytest.approx(0.95, rel=0.2)</code></pre>
How do you test an ML pipeline?
We’ve shown you how to test the steps in your pipeline, but how do you test the pipeline as a whole?
<pre><code>import pytest
import os
import logging
from zenml.logger import disable_logging
from zenml.post_execution import get_run
from pipelines import training_pipeline
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
EXPECTED_LENGTH = 1797
@pytest.fixture(scope='session')
def pipeline_run():
with disable_logging(log_level=logging.INFO):
pipeline = training_pipeline.with_options(
config_path=BASE_DIR + '/test_pipeline_config.yaml',
unlisted=True
)
pipeline()
@pytest.fixture()
def get_pipeline_run():
return get_run(name='test-pipeline')
def test_pipeline_executes(get_pipeline_run):
evaluator_result = get_pipeline_run.get_step(step='evaluator').output.read()
assert evaluator_result == pytest.approx(0.95, rel=0.2)</code></pre>
The above example shows you how to test the pipeline we’ve created. I’ve created two fixtures which set everything we need up. In the first (<code>pipeline_run()</code>), an instance of the training pipeline is configured and run, and in the second (<code>get_pipeline_run()</code>), the latest run of that pipeline is fetched.
As part of the testing pipeline creation, we’ve specified a configuration which is required for ZenML pipelines, even in a testing context. It’s a mirror of the configuration used in the main code, and it looks like this:
<pre><code>run_name: test-pipeline
enable_cache: False
steps:
data_loader:
parameters:
test_size: 0.2</code></pre>
In this case, the <code>test_size</code> parameter in the configuration above is overwritten by the <code>data_parameters</code> fixture, but this could be excluded from the configuration. We’ll talk more about this later.
You’ll notice that the <code>scope</code> variable has been set to <code>session</code> in the first fixture. This is because we want to run the whole pipeline once and record that run, if we didn’t set that scope variable, then the pipeline would be run for each test.
You may have also noticed that <code>unlisted</code> is set to <code>True</code> when we initialise the pipeline, this is to avoid clogging up the ZenServer with test runs. This isn’t something that you need to do but we recommend it, unless you want to store your test runs in the ZenServer.
The final function in the example is the test itself (there are more listed in the repository). This test takes the pipeline run generated by the fixture and uses some handy functions provided by ZenML: <code>get_step()</code> and <code>read()</code>. It’s worth explaining a little bit about how ZenML works for those less familiar. The output of a step in a pipeline is an artifact which is stored in the ZenServer, so in the case of our <code>evaluator</code> step, the artifact is the performance metric (a float). As we have a pipeline run (under the name <code>test-pipeline</code>), we’re able to fetch the evaluator step associated with that run, get its output, and then read it in. From there, we’re then able to perform the test itself (the assert). There are multiple other tests implemented in the repository, so I would encourage you to check those out.
Before we finish, let’s revisit the configuration that I mentioned earlier. There are a couple of approaches you could take: 1) defining a test configuration (as I have; in the <code>test_pipeline</code> directory), 2) reusing the configuration that <code>run.py</code> uses, or 3) defining a configuration dynamically as a fixture. The approach is up to you, and the tests wouldn’t change too much.
The blog post has discussed why properly testing machine learning pipelines is important and given you the tools (via ZenML and some of the handy functions it implements) to start building production-ready pipelines. All code is available here. We hope you find this useful and let us know how you get on with testing your pipelines!