HopsML Pipelines

HopsML is a Python-first framework for building machine learning pipelines. HopsML is enabled by unique support for project-specific conda environments in Hopsworks. As every project in Hopsworks has its own conda environment, replicated at all data processing hosts in the cluster, Data Scientists can simply ‘pip install’ Python libraries in Hopsworks and immediately use them in their PySpark/TensorFlow/PyTorch applications, enabling interactive application development. This contrasts with an immutable infrastructure approach, where Data Scientists need to write Dockerfiles and write YML files describing cluster specifications just to install a Python library. The other unique aspect of HopsML is the use of HopsFS (a distributed filesystem) to coordinate the different steps in a pipeline. HopsFS integrates seamlessly with Estimator APIs in TensorFlow/Keras, enabling the transparent management and aggregation of logs, checkpoints, TensorBoard events, and models across many Executors in a cluster. HopsML extends these Estimator artifacts with versioned notebooks and Python environments, enabling a view of experiments that have been run and now can be easily reproduced.

HopsML Pipeline

A machine learning (ML) pipeline is a series of processing steps that:

  • optionally ingests (raw) input data from external sources,
  • wrangles the input data in an ETL job (data cleaning/validation, feature extraction, etc) to generate clean training data,
  • trains a model (using GPUs) with the clean training data,
  • validates and optimizes the model,
  • deploys the model to production,
  • monitors model performance in production.

HopsML pipelines are written as a different programs for each stage in the pipeline, and the pipeline itself is written as a Airflow DAGs (directed acyclic graph). Typically all programs in the pipeline are written in Python, although Scala/Java ca be used at the ETL stage, in particular when dealing with large volumes of input data.

For ML pipelines processing small amounts of data, developers can write a Keras/TensorFlow/PyTorch application to perform both ETL and training in a single program, although developers should be careful that the ETL stage is not so CPU intensive that GPUs cannot be fully utilized when training. For example, in an image processing pipeline, if the same Keras/TensorFlow/PyTorch application is used to both decode/scale/rotate images as well as train a deep neural network (DNN), the application will probably be CPU-bound or I/O bound, and GPUs will be underutilized.

For ML pipelines processing large amounts of data, developers can write a seperate Spark or PySpark application to perform ETL and generate training data. When that application has completed, Airflow will then schedule a PySpark application with Keras/TensorFlow/PyTorch to train the DNN, on possibly many GPUs. The training data will be read from a distributed filesystem (HopsFS), and all logs, TensorBoard events, checkpoints, and the model will be written to the same distributed filesystem. When training has completed, Airflow can schedule a simple Python/Bash job to optimize the trained model (e.g., quantize model weights, remove batch norm layers, shrink models for mobile devices), using either Nvidia’s TensorRT library or TensorFlow’s transform_graph utility. The optimized model (a .pb (protocol buffers) file in TensorFlow) can then be deployed directly from HopsFS to a model serving server (TensorFlow serving Server on Kubernetes) using a REST call on Hopsworks. Finally, Airflow can start a Spark Streaming job to monitor the deployed model by consuing logs for the deployed model from Kafka.

HopsML Pipeline with HopsFS

HopsML uses HopsFS, a next-generation version of HDFS, to coordinate the different steps of an ML pipeline. Input data for pipelines can come from external sources, such as an existing Hadoop cluster or a S3 datalake, a feature store, or existing training datasets. External datasources can push data to HopsFS using either the Hopsworks REST-API or using Kafka in Hopsworks.

During a ML pipeline HopsFS acts as a central coordinator for sharing data between the different stages. Examples of such data include features from the store, existing training data, PySpark/TensorFlow application logs, TensorBoard events (aggregate from many different executors/GPUs), output models, checkpoints, partial/full results from hyperparameter optimization.

Data Collection

The datasets that you are working with will reside in your project in HopsFS. Data can be uploaded to your project in a number of ways, such as using the hops-cli client, the REST API or the uploader in the Hopsworks UI. HopsFS is the filesystem of Hopsworks, it is a next-generation version of Apache HDFS with distributed metadata, and is compatible with any API that can read data from an HDFS path, such as TensorFlow, Spark and Pandas.

Data Preparation

It is important to validate the datasets used in your pipeline, for example imbalanced classes may lead to Machine Learning models being biased towards more frequently occurring labels in the dataset. Therefore it is of outmost importance for input data to be balanced and representative of the domain from which the data came. One of the big steps toward ensuring the correctness of data is through data quality and validation checks. Machine Learning models, as have been observed empirically and in papers, reduce their generalization error for larger datasets. Therefore it is also critical to have a data wrangling and validation engine which scales for ever increasing datasets. The solution for this is to go distributed in order to process every single record, but still have a rich API for perform quality checks and manipulating the data. The pipeline makes use of Spark to provide these capabilities.

Spark Dataframes can be used to transform and validate large datasets in a distributed manner. For example schemas can be used to validate the datasets. Useful insights can be calculated such as class imbalance, null values for fields and making sure values are inside certain ranges. Datasets can be transformed by dropping or filtering fields.

For visualizations on datasets, see spark-magic or facets examples here.

Feature Store

“A feature store allows teams to share, discover, and use a highly curated set of features for their machine learning problems”
Michelangelo by Uber

Hopsworks provides a feature store to curate, store, and document features for use in ML pipelines. The feature store serves as the interface between data engineering and data science in HopsML pipelines. The feature store requires a change in mindset for data engineering and data scientists, instead of writing custom pipelines where models have their own feature storage, it is encouraged to assemble all features in the feature store so that features can be shared between several models and projects.

The Feature Store enables the following best-practices for feature engineering:

  1. Feature Reuse/Collaboration,
  2. Feature Documentation,
  3. Feature Backfilling,
  4. Feature Versioning,
  5. Automatic Feature Analysis,
  6. DRY (not repeat yourself) feature engineering.
A feature store is the interface between feature engineering and model development.

See feature_store for more information.

Experimentation

In HopsML we offer a rich experiment API for data scientists to run their Machine Learning code, whether it be TensorFlow, Keras, PyTorch or another framework with a Python API.

Hopsworks supports cluster-wide Conda for managing Python library dependencies. Hopsworks is organized around projects, and each project has its own conda environment, replicated at all hosts in the cluster. When you launch a PySpark job, both the Driver and Executors run in the conda environment for that project (the conda environment is replicated at all hosts in the cluster and available locally). This way, users can install whatever libraries they like using conda and pip package managers, and then use them directly inside Spark Executors. It makes programming PySpark one step closer to the single-host experience of programming Python.

HopsML comes with a novel Experiments service for curating results of Machine Learning experiments, comparing hyperparameters and metrics for hyperparameter optimization tasks. In addition to attaching hyperparameters and metrics to experiments, files may also be attached such logs or images.

TensorBoard

The following is a TensorBoard visualizing hyperparameter optimization using differential evolution.

TensorBoard

See experiments for more information.

See jupyter for development using Jupyter notebooks.

Model Repository

The model repository lists all the models which have been exported in the project. When a model is exported, any number of metrics can be attached to reflect the performance of the model, such as model accuracy. In pipelines, users can query the model repository to find the best version for a given model name. This is done by supplying the name of the metric to consider and whether the value should be maximized or minimized to find the best version. For example below we see that version 3 is the best model given the accuracy metric, and is also the one which should be served for online inference.

Model Repository

See models for more information.

Serving

In the pipeline we support a scalable architecture for serving of TensorFlow, Keras and scikit-learn models. We use the TensorFlow Serving server running on Kubernetes to scale up the number of serving instances dynamically and handle load balancing. There is support for using either the grpc client or the REST API to send inference requests. Furthermore we also support a monitoring system that logs the inference requests and allows users to implement custom functionality for retraining of models. For scikit-learn users implement a REST API themselves using a python file template which loads the model in memory and responds to inference requests.

TensorBoard

See tf_model_serving, sklearn_model_serving and inferencing for more information.

Pipeline Orchestration

HopsML pipelines are typically run as Airflow DAGs, written in Python. An Airflow pipline is a directed acyclic graph (DAG) of tasks to be executed, orchestration rules, failure handling logic, and notifications. Airflow DAGs can be scheduled to run periodically, for example, once per hour, or Airflow can wait for an event (with sensors) before executing a task - for example, wait for _SUCCESS file in a parquet directory before understanding that the Parquet file(s) are finished being written. Typical tasks in a production Airflow ML pipeline on Hopsworks involve Data Prep as a PySpark job, training using HopsML (PySpark + TensorFlow), model optimization using a PySpark job or a bash job, and model deployment as either a Python program or bash script.

HopsML Pipeline orchestrated by Airflow