Save time by building a continuous training pipeline for ML

Design of the pipeline components

Hector Andres Mejia Vallejo
6 min readJan 28, 2022

From early projects, my team and I used to spend large amounts of time training, testing, writing the results, and deploying the model manually for the business case. It takes valuable time that could be used on research and innovation and puts it on tasks that would otherwise be saved using the right tools and some planning.

Let’s discuss here how to create a continuous training pipeline for your ML models using MLflow, scikit-learn, and Delta Lake. In addition, I will apply these tools for Databricks, which I use daily, but the concepts can be still applied if you use other platforms.

Totally unrelated, but very peaceful image. Photo by billow926 on Unsplash

0. Overview of the process

Let us look at the following diagram to see which components should we build to create an automated training pipeline.

Continuous Training Pipeline. Made with ❤ by me.

We have data ETL, training, registry, and serving as essential components. In addition, we have isolated Databricks workspaces for development and production so we have to figure out a way to send the specifications for the model that will be registered in the Databricks production environment.

1. Data ETL component

This component depends on your use case. We can capture data from files, APIs, data streams, etc. Whatever the case may be, we can capture and process data for machine learning modeling in the following fashion:

ETL diagram. Made by me.
  • The captured data goes to a Data Lake or staging area and is stored as-is.
  • From this repository, the raw data is cleaned, validated for data types, and feature extraction is performed.
  • This pre-processed data is stored as a database table in a data warehouse. In my case, I use delta tables.
  • Train/validation/test splits are created and saved on the data warehouse.

One great thing about using delta as a data lake and warehouse is that it implements versioning and time travel, so we never lose track of the evolution of our data. In addition, it is meant to be scalable.

2. Model training component

For a continuous training pipeline, we need to record the model performance for a given iteration, retrieve that metadata with the model, compare with previous versions, and set the best production model. Everything has to be implemented programmatically. Luckily, some tools will help us achieve this.

Model training diagram. Made by me

This component will use the Hyperopt package for hyperparameter tuning. It receives a function that trains a model and returns the best hyperparameters. Furthermore, that trained model is going to be tracked by MLflow. It will log its metrics, parameters, performance plots like ROC, and save the model binary. I will not show the code in this article since it is a little large. You can check it out in this gist. For now, assume a function with the following signature:

def run_experiment(experiment_id, run_name, project_path,
search_space, classifier_type,
X_train, X_test, X_valid,
y_train, y_test, y_valid,
model_binary=None, roc=False, max_evals = 150):
  • The argument experiment_id is an integer value that represents a MLflow experiment. An experiment is where the training runs will be stored, with all the information that we log as you saw on the mlflow_training snippet. You can create an experiment using the GUI or programmatically:
experiment_id = mlflow.create_experiment(experiment_name)
  • The run_name argument is just a string to name the MLflow run.
  • project_path is where all the artifacts will be stored.
  • search_space is the grid that will be explored by Hyperopt. For instance, a valid search space for aSGDClassifiercould be:
search_space = {
'alpha': hp.uniform('alpha',0,0.0001),
'max_iter': scope.int(hp.quniform('max_iter', 50, 1000, 50)),
'l1_ratio' : hp.uniform('l1_ratio',0.1,0.3),
'penalty': "elasticnet",
'loss': "modified_huber"
}
  • classifier_type is the scikit learn classifier class, e.g SGDClassifier.
  • x_train, x_test, y_train, y_test, x_valid, y_valid are the data splits.
  • max_evals is the maximum number of evaluations that hyperopt will perform for the tuning.

Then we call the function like this:

best_params, run id = run_experiment(experiment_id, 
model_name,
project_path,
search_space,
SGDClassifier,
X_train, X_test, X_valid,
y_train, y_test, y_valid,
model_binary=None,
roc=True)

3. Model registry component

With the training component, all our runs will be logged. However, we need a mechanism to register the best model out of all those runs, set it for production, and retrieve it whenever necessary. All that programmatically. This is where MLflow model registry comes in.

Model registry diagram. Made with ❤ by me.

From the training component, you can see that each run is set as a candidate run with mlflow.set_tag("candidate", “True"). On this registry component we will retrieve all candidate runs, and all the versions of registered models, if applicable. Then, we will get the best run and the best-registered model in terms of AUC. After that, the model of the best run and the best-registered model are compared to set a final model as production.

You can check out the complete gist here. For now let’s check the signature for the public functions of the class ModelEvaluationPipeline:

class ModelEvaluationPipeline():
def __init__(self, spark,
experimentID,
model_name,
bootstrap_ratio):
def run(self, X_test, Y_test):
  • experimentID is the same integer number used in the training component.
  • model_name is a string to name the model on the registry. If it already exists, then the versions of the model under that name will be also compared with the candidate runs.
  • The evaluation is performed by sampling multiple times the targets and predictions, then calculating AUC for each of them. This gives an array of AUCs. bootstrap_ratio is the minimum percentage that the current AUC array has to surpass by comparing it to the best AUC array so far.

We call this component as follows:

eval_pipeline = ModelEvaluationPipeline(spark, experiment_id,           
model_name, bootstrap_ratio)
eval_pipeline.run(docs, y_test)

If we only have one Databricks environment for every stage, then our CT pipeline stops here since we end up with a model set as Production ready by MLflow model registry. However, if for some reason, there are isolated environments, then we need to be able to send the artifacts from the development environment to the production environment.

4. Model ublishing Component

Model Publishing component. Made by me.

This component works only if you have two isolated Databricks workspaces for development and production and you need to send the ML model to production when it is ready. My approach was just to have a shared filesystem where the model artifacts would go from the development environment. Then, the production environment would pick up those files and register the model on MLflow.

  1. First, we mount the shared storage on both Databricks environments. I use Azure Blob Storage. Check out how to do it here.
  2. Send the artifacts after the model was registered in the development environment.

Check out the gist here for steps 1 and 2.

3. Track the model on the production environment, reusing the run_experiment function that was created in the model training component. The difference is that the parameter model_binary should have the artifact.

run_experiment(experiment_id_prod, model_name, 
X_test=X_test, y_test=y_test,
model_binary=model_binary,
roc=True)

4. Now you should run the register component in the same way as we did on the development environment.

eval_pipeline = ModelEvaluationPipeline(spark, prod_experiment_id,           
model_name, bootstrap_ratio)
eval_pipeline.run(docs, y_test)

That is it for the pipeline design. These are only the components needed to train, tune and register an ML model. On a later post, I will show the some orchestration tools to automate this continuous training process. Thank you for your support. If you liked this article, please clap.

See ya!

--

--

Hector Andres Mejia Vallejo

Ecuadorian. Studied CS, now working as a data engineer and doing a masters in data science. Passionate for AI and discovering stories in the data!