Counting 1,690 Big Data & Machine Learning Frameworks, Toolsets, and Examples...
Suggestion? Feedback? Tweet @stkim1

Criteo 1 TiB benchmark

Table of contents

Introduction

(back to toc)

This project is a minimal benchmark of applicability of several implementations of machine learning algorithms to training on big data. Our main focus is Spark.ML and how it compares to commonly used single-node machine learning tools Vowpal Wabbit and XGBoost in terms of scaling to terabyte (billions of lines) train data. Quick web search shows that many people tested Spark but not on tasks requiring a cluster so they are mostly single-node tests.

This project is inspired by https://github.com/szilard/benchm-ml but is focused on training models on billions of lines of train data, including Spark.ML in multinode cluster environment.

Task and data

(back to toc)

Our target application is prediction of click-through ratio (CTR) of banners in online advertising. Criteo released an industry-standard open dataset which represents banner impressions in online advertising during the timespan of 24 days. It is more than 1 terabyte in size and consists of more than 4 billion lines of data. Each line represents a banner impression and contains 40 columns separated by tabulation:

  • the first column is a label - {0, 1} - 1 meaning the banner was clicked and 0 otherwise;
  • 13 numeric columns;
  • 26 categorical columns with categories being 32-bit hashes.

This is how it looks like:

Dataset schema

All the data except the last day was concatenated and sampled into training sets of 10ⁿ and 3×10ⁿ lines with n ∈ {4, 5, ..., 9} (i.e. train samples' sizes are 10k, 30k, 100k, ..., 1kkk, 3kkk lines). The last day was used for testing - a sample of one million lines was taken from it. All samples were converted to

  • LibSVM format for training XGBoost models and as a source for the transformation to Spark.ML DataFrame;
  • Vowpal Wabbit data format.

Data for Spark.ML models was processed on-the-fly from LibSVM format:

  • into a dataset of tuples of "label" (integer) and "features" (SparseVector of size 10⁵ using hashing trick for all features) for Spark.ML LogisticRegression;
  • into a dataset of tuples of "label" (integer) and "features" (SparseVector of size 39 taken as-is from corresponding columns, see below) for Spark.ML RandomForestClassifier.

Algorithms

(back to toc)

Historically, we make use of Vowpal Wabbit and XGBoost exploiting "Local train + Distributed apply" scenario. Our task was to run a performance test of our currently used approach and Spark.ML library algorithms.

We used the following non-distributed algorithms:

  • Vowpal Wabbit - it implements logistic regression with a hashing trick and reads the data only once never keeping more than one sample in memory (it is an out-of-core implementation);
  • in-memory XGBoost - gradient-boosted trees implementation that (by default) loads the whole data into memory (which is faster than multiple reads from disk, but we are limited in size by machine memory);
  • out-of-core XGBoost - a variant of XGBoost training which uses an on-disk cache; this is slower (compared to the in-memory variant) but potentially we can train on the data limited in size only by the size of HDD.

Spark.ML contains following classification algorithms:

Our preliminary research shows that four of the algorithms are not well-suited for our task of CTR prediction:

  • NaiveBayes provides significantly worse logistic loss (which is an essential metric of CTR models' quality) than all other models;
  • DecisionTreeClassifier suffers in quality in comparison to the RandomForestClassifier but still requires roughly the same amount of time to train;
  • GBTClassifier (Spark.ML implementation of gradient-boosted trees) and MultilayerPerceptronClassifier do not support prediction of probabilities that are required by the task (these two models are not shown on graphs above).

ROC AUC Log loss Training time

Thus we use only LogisticRegression and RandomForestClassifier for our testing purposes.

Setup

(back to toc)

Local models were trained on a 12-core (24-thread) machine with 128 GiB of memory. Distributed training was performed on our production cluster (total capacity is approximately 2000 cores and 10 TiB of memory); for the experiment a small part of resources has been allocated - 256 cores and 1 TiB of memory for training on datasets upto 300 million of lines and 512 cores and 2 TiB of memory for training on one billion and 3 billion lines of train data. 4 cores and 16 TiB of memory per Spark executor was used.

For the experiment we used Vowpal Wabbit 8.3.0, XGBoost 0.4 and Spark 2.1.0 running on a Hadoop 2.6 cluster (using YARN as a cluster manager).

Experiment layout

Hyperparameter optimization

(back to toc)

Our first idea was to skip models' hyperparameters optimization completely, but unfortunately XGBoost's default hyperparameters are not good enough for training even on million lines of data - the default number of trees is only 10, and it hits the ceiling quite soon:

ROC AUC Log loss

These figures reminded us that production usage of any machine learning model is associated with optimization of its hyperparameters, and in our experiment we should do the same. For optimization of models' hyperparameters (including Spark.ML ones) we used the million-line sample of train data and 5-fold cross validation for metric (log loss) averaging.

Data format for Spark.ML

(back to toc)

We tried to use one-hot-encoding of categorical features, but due to very large number of unique values it turned out to be very time and memory consuming, so for Spark.ML we decided to try the hashing trick. Spark.ML LogisticRegression was trained using this approach. We sticked to hashing space of 10⁵ hashes as it turned out to give about the same quality as VW on large samples. Taking less hashes usually leads to better quality on smaller data (because of less overfitting) and worse quality on bigger data (because some patterns in data are consumed by collisions in hashing space):

ROC AUC Log loss

RandomForestClassifier was very slow to train even with a thousand hashes, so we used "as-is" format for it:

  • all numeric features were converted to elements of SparseVector as-is;
  • all categorical features were converted to elements of SparseVector by interpreting the hashes as 32 bit numbers.

Code

(back to toc)

All work was performed in Jupyter notebooks in Python. Notebooks:

Results

Local training - Vowpal Wabbit & XGBoost

(back to toc)

ROC AUC Log loss Train time Maximum memory CPU load

Some observations:

  • our main concern about an out-of-core training of XGBoost was that it would not produce the same quality as its in-memory variant due to the approximate splitting algorithm; however, in-memory XGBoost and out-of-core XGBoost turned out to provide about the same level of quality, but out-of-core variant is about an order of magnitude slower;
  • in-memory XGBoost is about an order of magnitude slower than Vowpal Wabbit on the same amount of train data;
  • Vowpal Wabbit was able to give about the same quality as XGBoost trained on an order of magnitude smaller sample.

Distributed training - Spark.ML

(back to toc)

ROC AUC Log loss Train time

We made the following conclusions:

  • RandomForestClassifier is quite slow, and it is even slower when the data consists of large vectors;
  • LogisticRegression is hard to set up for smaller samples and for bigger samples at the same time - either it overfits on small data or it cannot extract patterns due to more aggressive hashing trick.

Distributed training - Time vs. Cores

(back to toc)

To check how model training scales to multi-core setup we made a quick test where we increased the number of cores and measured training time for every step. To make it fast we used a 10⁷ sample of train data and checked training time for a number of cores from 5 to 50 in steps of 5. In order to eliminate the uncertainty brought forth by running the test in parallel with production tasks, for this test we created a standalone Spark cluster running on three machines with a total of ≈50 cores and ≈200 GiB of memory.

Time vs. cores

Training time dropped quite fast when we went from 5 to 15 cores but slowed down afterwards and completely ceased to improve by the mark of 40 cores (even growing a little on transition from 40 to 45 cores). The main idea we have extracted from this figure is that one should not increase amount of resources beyond minimum required, so that work distribution and aggregation overhead would be cheaper than potential improvement of speed by parallelization.

Comparison of local vs. remote

(back to toc)

ROC AUC Log loss Train time

We can see that:

  • on large datasets (100 million of lines and more) Spark.ML is faster than both Vowpal Wabbit and XGBoost; maybe it is possible to make it even faster by finding the best cluster setup for each size of training sample (we had not done this work);
  • however it is slow when working with large vectors - steps should be taken in order to find a balance between quality and speed;
  • for small tasks Spark introduces overhead that can be more expensive than it is possible to gain by computing the task in parallel (well, this is true for parallel computing in general).

Conclusion

(back to toc)

One should pick a correct tool for the task, and Spark.ML can be that kind of tool for training machine learning models on big data.