Save time by building a continuous training pipeline for ML
Design of the pipeline components
From early projects, my team and I used to spend large amounts of time training, testing, writing the results, and deploying the model manually for the business case. It takes valuable time that could be used on research and innovation and puts it on tasks that would otherwise be saved using the right tools and some planning.
Let’s discuss here how to create a continuous training pipeline for your ML models using MLflow, scikit-learn, and Delta Lake. In addition, I will apply these tools for Databricks, which I use daily, but the concepts can be still applied if you use other platforms.
0. Overview of the process
Let us look at the following diagram to see which components should we build to create an automated training pipeline.
We have data ETL, training, registry, and serving as essential components. In addition, we have isolated Databricks workspaces for development and production so we have to figure out a way to send the specifications for the model that will be registered in the Databricks production environment.
1. Data ETL component
This component depends on your use case. We can capture data from files, APIs, data streams, etc. Whatever the case may be, we can capture and process data for machine learning modeling in the following fashion:
- The captured data goes to a Data Lake or staging area and is stored as-is.
- From this repository, the raw data is cleaned, validated for data types, and feature extraction is performed.
- This pre-processed data is stored as a database table in a data warehouse. In my case, I use delta tables.
- Train/validation/test splits are created and saved on the data warehouse.
One great thing about using delta as a data lake and warehouse is that it implements versioning and time travel, so we never lose track of the evolution of our data. In addition, it is meant to be scalable.
2. Model training component
For a continuous training pipeline, we need to record the model performance for a given iteration, retrieve that metadata with the model, compare with previous versions, and set the best production model. Everything has to be implemented programmatically. Luckily, some tools will help us achieve this.
This component will use the Hyperopt package for hyperparameter tuning. It receives a function that trains a model and returns the best hyperparameters. Furthermore, that trained model is going to be tracked by MLflow. It will log its metrics, parameters, performance plots like ROC, and save the model binary. I will not show the code in this article since it is a little large. You can check it out in this gist. For now, assume a function with the following signature:
def run_experiment(experiment_id, run_name, project_path,
search_space, classifier_type,
X_train, X_test, X_valid,
y_train, y_test, y_valid,
model_binary=None, roc=False, max_evals = 150):
- The argument
experiment_id
is an integer value that represents a MLflow experiment. An experiment is where the training runs will be stored, with all the information that we log as you saw on themlflow_training
snippet. You can create an experiment using the GUI or programmatically:
experiment_id = mlflow.create_experiment(experiment_name)
- The
run_name
argument is just a string to name the MLflow run. project_path
is where all the artifacts will be stored.search_space
is the grid that will be explored by Hyperopt. For instance, a valid search space for aSGDClassifier
could be:
search_space = {
'alpha': hp.uniform('alpha',0,0.0001),
'max_iter': scope.int(hp.quniform('max_iter', 50, 1000, 50)),
'l1_ratio' : hp.uniform('l1_ratio',0.1,0.3),
'penalty': "elasticnet",
'loss': "modified_huber"
}
classifier_type
is the scikit learn classifier class, e.gSGDClassifier
.x_train, x_test, y_train, y_test, x_valid, y_valid
are the data splits.max_evals
is the maximum number of evaluations that hyperopt will perform for the tuning.
Then we call the function like this:
best_params, run id = run_experiment(experiment_id,
model_name,
project_path,
search_space,
SGDClassifier,
X_train, X_test, X_valid,
y_train, y_test, y_valid,
model_binary=None,
roc=True)
3. Model registry component
With the training component, all our runs will be logged. However, we need a mechanism to register the best model out of all those runs, set it for production, and retrieve it whenever necessary. All that programmatically. This is where MLflow model registry comes in.
From the training component, you can see that each run is set as a candidate run with mlflow.set_tag("candidate", “True")
. On this registry component we will retrieve all candidate runs, and all the versions of registered models, if applicable. Then, we will get the best run and the best-registered model in terms of AUC. After that, the model of the best run and the best-registered model are compared to set a final model as production.
You can check out the complete gist here. For now let’s check the signature for the public functions of the class ModelEvaluationPipeline
:
class ModelEvaluationPipeline():
def __init__(self, spark,
experimentID,
model_name,
bootstrap_ratio): def run(self, X_test, Y_test):
experimentID
is the same integer number used in the training component.model_name
is a string to name the model on the registry. If it already exists, then the versions of the model under that name will be also compared with the candidate runs.- The evaluation is performed by sampling multiple times the targets and predictions, then calculating AUC for each of them. This gives an array of AUCs.
bootstrap_ratio
is the minimum percentage that the current AUC array has to surpass by comparing it to the best AUC array so far.
We call this component as follows:
eval_pipeline = ModelEvaluationPipeline(spark, experiment_id,
model_name, bootstrap_ratio)
eval_pipeline.run(docs, y_test)
If we only have one Databricks environment for every stage, then our CT pipeline stops here since we end up with a model set as Production
ready by MLflow model registry. However, if for some reason, there are isolated environments, then we need to be able to send the artifacts from the development environment to the production environment.
4. Model ublishing Component
This component works only if you have two isolated Databricks workspaces for development and production and you need to send the ML model to production when it is ready. My approach was just to have a shared filesystem where the model artifacts would go from the development environment. Then, the production environment would pick up those files and register the model on MLflow.
- First, we mount the shared storage on both Databricks environments. I use Azure Blob Storage. Check out how to do it here.
- Send the artifacts after the model was registered in the development environment.
Check out the gist here for steps 1 and 2.
3. Track the model on the production environment, reusing the run_experiment
function that was created in the model training component. The difference is that the parameter model_binary
should have the artifact.
run_experiment(experiment_id_prod, model_name,
X_test=X_test, y_test=y_test,
model_binary=model_binary,
roc=True)
4. Now you should run the register component in the same way as we did on the development environment.
eval_pipeline = ModelEvaluationPipeline(spark, prod_experiment_id,
model_name, bootstrap_ratio)
eval_pipeline.run(docs, y_test)
That is it for the pipeline design. These are only the components needed to train, tune and register an ML model. On a later post, I will show the some orchestration tools to automate this continuous training process. Thank you for your support. If you liked this article, please clap.
See ya!