Deep Learning with Intel’s BigDL and Apache Spark

Cloudera recently published a blog post on how to use Deeplearning4J (DL4J) along with Apache Hadoop and Apache Spark to get state-of-the-art results on an image recognition task. Continuing on a similar stream of work, in this post we discuss a viable alternative that is specifically designed to be used with Spark, and data available in Spark and Hadoop clusters via a Scala or Python API.

The Deep Learning landscape is still evolving. On one hand we have veteran frameworks like Theano and Caffe, or more popular ones like TensorFlow, while on the other we see emergence of JVM-based frameworks that can perform distributed deep learning using GPUs and CPUs. These JVM-based tools can leverage existing Spark clusters to parallelize the training of models. This post will discuss Intel’s BigDL, an open source distributed deep learning framework for Big Data platform using Apache Spark. It will highlight some high level differences between Deeplearning4j and BigDL, detail how to build a Convolutional Neural Network (CNN) while exploring a different approach to transfer learning – fine-tuning pre-trained models – and provide suggestions on troubleshooting and tuning Apache Spark jobs.

Comparing BigDL and Deeplearning4j

Intel’s BigDL is a relatively new open source framework that provides native support for deep learning on Spark using CPUs. On the other hand, Deeplearning4j was created in 2014, is built for the JVM and allows training deep neural networks on CPUs and GPUs. To compare these frameworks directly may not be fair, given their different life spans and both are still under development but comparing features can help us understand when one or both are applicable to a particular problem. ||BigDL||Deeplearning4j

Deeplearning4j

|Framework & APIs|Comparatively a new deep learning framework that is modeled after Torch. Supports Scala and Python API||Has been around for three years and is a JVM based deep learning project. Supports Scala and Java API|

Comparatively a new deep learning framework that is modeled after Torch. Supports Scala and Python API

|Chip(s) supported|Supports distributed deep learning on commodity CPUs and not GPUs. BigDL works on any x86_64 hardware that uses MKL which is specially designed for Intel hardware||Supports distributed deep learning on CPUs and GPUs|

Supports distributed deep learning on commodity CPUs and not GPUs. BigDL works on any x86_64 hardware that uses MKL which is specially designed for Intel hardware

|Pre-trained model support|Supports models created in TensorFlow, Caffe or Torch framework ||Supports models created in Caffe, Torch, Theano and TensorFlow framework. Also, has its new native model zoo library that would allow the user to automatically download and instantiate a pre-trained model|

Supports models created in TensorFlow, Caffe or Torch framework

|Linear algebra library|Uses Intel’s MKL (Math Kernel Library) ||Uses ND4J native linear algebra library. By default OpenBLAS is used with ND4J but there is an option to use Intel’s MKL|

Uses Intel’s MKL (Math Kernel Library)

|User experience|Designed to be used along with Apache Spark which is a plus for the existing Spark users||Primarily built for the JVM, hence Java developers have been a primary focus. Good documentation and resources to get started, plus Gitter support channel for questions|

Designed to be used along with Apache Spark which is a plus for the existing Spark users

|Distributed deep neural network training on Spark|Supports training with CPUsEmploys synchronous parameter averaging to train a network where Spark’s Block Manager serves as a parameter server, thus avoiding the driver being a bottleneck in the communication, details here. The parameter averaging occurs once a mini-batch is processed by all the workers.The mini-batch size is expected to be a multiple of total cores used in the job||Supports training with CPUs and GPUsEmploys synchronous parameter averaging to train a network, where the driver serves as a parameter server, details here. Recently introduced a thresholding approach to training that reduces network communication requirements by orders of magnitude 1 The user has a choice to set the parameter averaging frequencyNo such restrictions on the batch size|

Supports training with CPUs

The parameter averaging occurs once a mini-batch is processed by all the workers.

Supports training with CPUs and GPUs

The user has a choice to set the parameter averaging frequency

|ETL & Traditional Machine Learning|User can employ Apache Spark and its ML/ MLlib algorithms||Supports ETL through DataVec and some core NLP functionality using ClearTK.  One could implement his/her own machine learning algorithms using ND4J|

User can employ Apache Spark and its ML/ MLlib algorithms

| Visualization|Supports visualization through TensorBoard||Provides its own web UI tool for visualizing various statistics like training loss and iterations|

Supports visualization through TensorBoard

Classifying Caltech-256 Images with BigDL

To further the comparison, below we present a worked example with BigDL that mirrors the previous DL4J example. That is, it walks through the steps to build a convolutional neural network that can classify images in the Caltech-256 dataset.

The Caltech-256 dataset was downloaded and separated into train and validation set using stratified sampling and stored in HDFS (full instructions). We begin with reading the JPEG files from HDFS as binary files and then decode the compressed format into the ByteRecord format as required by BigDL. Like any other machine learning workloads, we may want to pre-process the data before feeding it into the model. In the current case, we chain three transformations using the Transformer APIs.

  val conf = Engine.createSparkConf().setAppName(“BigDL: Training CNN utilizing pre-trained VGG16 model”)val sc = new SparkContext(conf)Engine.init()// Read training data images and transform themval trainSet = DataSet.rdd(Utils.readJpegs(sc, “hdfs:///user/leon/Caltech-256/train”, 224)) -> BytesToBGRImg() -> BGRImgNormalizer(104, 117, 123, 1, 1, 1) -> BGRImgToBatch(16)

val sc = new SparkContext(conf)

// Read training data images and transform them

DataSet.rdd(Utils.readJpegs(sc, “hdfs:///user/leon/Caltech-256/train”, 224)) ->

BGRImgNormalizer(104, 117, 123, 1, 1, 1) ->

For computing efficiency, we would like to train and infer data in batches, in this case 16. The mini batch size must be a multiple of the total number of cores. All this has a role to play in how the training process is conducted within BigDL.

Transfer learning

We are aware that Convolutional Neural Networks (CNNs) require large datasets and a lot of computational time to train. So instead of training from scratch, transfer learning utilizes a pre-trained model on a new dataset. Previously, we extracted features up until the second last layer and trained a new prediction layer. This time, we will not only train a new prediction layer but also allow the previous layers except the last to be tweaked, if needed.

To elaborate, let’s say a CNN has a set of shared parameters θs (e.g., five convolutional layers and two fully connected layers), task-specific parameters for previously learned tasks θo (e.g., the output layer for ImageNet classification and corresponding weights), and randomly initialized task specific parameters for new tasks θn (e.g., scene classifiers). It is useful to think of θo and θn as classifiers that operate on features parameterized by θs. Currently, there are three common approaches to learning θn while benefiting from previously learned θs2.

  • Feature Extraction: Uses a pre-trained CNN to compute the features of an image. θs and θo are unchanged, and the outputs of one or more layers are used as features for the new task in training θn. The extracted features are activations of one layer (usually the last hidden layer) or multiple layers given the image. This is what we accomplished in the previous blog post.

  • Fine-tuning: Modifies the parameters of an existing CNN to train a new task and the output layer is extended with randomly initialized weights for the new task. That is, θs and θn are optimized for the new task, while θo is fixed. We employ this strategy in the current post. It is also possible to use a variation of fine-tuning where part of θs – the convolutional layers – are frozen and only top fully connected layers are fine-tuned. This can be seen as a compromise between fine-tuning and feature extraction.  

  • Joint Training: All parameters θs, θo, θn are jointly optimized, for example by interleaving samples from each task.

Figure 1: Transfer Learning Approaches2

Using pre-trained models with BigDL

BigDL supports loading pre-trained models, all it needs is a model checkpoint file (pre-trained model), a definition file and a BigDL Module (network) that represents the layers of the neural network. The Caffe library has a Model Zoo where people share their checkpoints, also known as network weights. For our learning task we use the VGG16-layer3 model checkpoint available here.

  // Note that if matchAll = false, then only layers with same name will be loaded, the rest will use initialized parametersval model = Module.loadCaffe[Float](VGG16NetCaltech(257),     modelDefPath = “./VGG_ILSVRC_16_layers.caffemodel”,     modelPath = “./VGG_ILSVRC_16_layers_deploy.prototxt”, matchAll = false)

val model = Module.loadCaffe[Float](VGG16NetCaltech(257),

    modelPath = “./VGG_ILSVRC_16_layers_deploy.prototxt”, matchAll = false)

To reuse the existing weights we first need to make sure that the layer names in the VGG16NetCaltech module matches the original network’s layer name found in the model definition file as shown below.

However, we know that the VGG16-layer model was trained on the ImageNet dataset that has 1000 different classes and would want to change it for the current application. It can be achieved by simply renaming the last fully connected layer from “fc8” to “fc8_caltech256” and changing the output size to the number of classes in the Caltech-256 dataset, that is, 257. Thus, we have defined a new hybrid model that uses weights from a pre-trained model for all the layers except for the last one saving tremendous resources and training time.

Distributed training

The training process requires you to specify the model, training dataset and the loss criterion. In this case, it’s the negative log likelihood, which is used often for classification problems.

  val optimizer = Optimizer( model = model, dataset = trainSet, criterion = new ClassNLLCriterionFloat)

model = model,

criterion = new ClassNLLCriterionFloat

Setting the hyperparameters for training.

  val optim = new SGD( learningRate = 0.0001, learningRateDecay = 0.0, weightDecay = 0.0001, momentum = 0.8, learningRateSchedule = SGD.EpochStep(20, 0.1), dampening = 0.0, nesterov = true)

learningRate = 0.0001,

weightDecay = 0.0001,

learningRateSchedule = SGD.EpochStep(20, 0.1),

nesterov = true

Along with the optimization method and associated hyperparameters we specify how frequently to test the validation set accuracy, how many passes to make over the training data, when and where to save the model checkpoint and so on.

  optimizer .setOptimMethod(optim) // Validation set accuracy to be tested after every epoch .setValidation(Trigger.everyEpoch,   valSet, Array(new Top1Accuracy[Float], new Top5Accuracy[Float])) // Save model checkpoint after every epoch .setCheckpoint(“./bigdl-models/vgg16-net”, Trigger.severalIteration(1301)) // End training after 15 epochs .setEndWhen(Trigger.maxEpoch(15)) .optimize()

.setOptimMethod(optim)

.setValidation(Trigger.everyEpoch,

// Save model checkpoint after every epoch

// End training after 15 epochs

.optimize()

Now if you run the Spark application on the cluster (details on submitting the Spark application and the entire code is available here), you would notice that it begins training the network in batches of 16 images and outputs the loss and other hyperparameters used.

  17/06/10 12:26:49 INFO optim.DistriOptimizer$: [Epoch 1 0/20812][Iteration 1][Wall Clock 0.0s] Train 16 in 10.541978735seconds. Throughput is 1.5177416 records/second. Loss is 5.750294. Current learning rate is 1.0E-4. Current weight decay is 1.0E-4. Current momentum is 0.8. Current nesterov is true.

Visualizing Learning Training a neural network can be complex and confusing. One may want to keep a tab on how the learning progresses to help understand, debug or even optimize it further. For example, if in a run we choose to randomize the order in which the images appear within the mini-batches, and in another one we do not (while keeping the rest of the hyperparameters unchanged), we would find that the prior approach reduces the training loss. The orange line in Figure 2 below depicts the training loss after randomization.

Figure 2. Comparing Model Runs

The process of enabling visualization via TensorBoard is pretty straightforward. After installing TensorBoard, simply specify the location and a name (that describes the run) to store the training and validation summary results. Note that by default the TrainSummary will display the “loss” and “throughput” at each iteration, one can also enable “Parameters” or “LearningRate” but keep in mind that this would cause additional overhead and might slow down the training process.

  val trainSummary = TrainSummary(“./logs”, “run1”)trainSummary.setSummaryTrigger(“Parameters”, Trigger.severalIteration(20))val validationSummary = ValidationSummary(param.logDir, param.appName)optimizer.setTrainSummary(trainSummary)optimizer.setValidationSummary(validationSummary)

trainSummary.setSummaryTrigger(“Parameters”, Trigger.severalIteration(20))

optimizer.setTrainSummary(trainSummary)

Finally, in order to view the training use the command

  $ tensorboard –logdir=./logs/run1Starting TensorBoard 41 on port 6006

Starting TensorBoard 41 on port 6006

The TensorFlow UI is then accessible at http://:6006, and it displays the loss, throughput, accuracy and other statistics.Finally, in order to view the training use the command ![](http://blog.cloudera.com/wp-content/uploads/2017/08/Figure-3.-Model-Training-Summary.png)

Figure 3. Model Training Summary

~5% Top1 Accuracy on validation set is pretty low to begin with, but as shown in Figure 3, at the end of the 15th Epoch the loss goes down and the training Top1 accuracy goes up to 30%.

Model checkpoints and performance

We can also independently test the model performance on a test set using any of the trained model snapshots saved at the checkpoint location.

  // Load validation setval testSet = DataSet.rdd(Utils.readJpegs(sc, “hdfs:///user/leon/Caltech-256/test”, 224)) -> BytesToBGRImg() -> BGRImgNormalizer(104, 117, 123, 1, 1, 1) -> BGRImgToBatch(16)// Load modelval model = Module.loadFloatval validator = Validator(model, testSet) val result = validator.test(Array(new Top1Accuracy[Float], new Top5Accuracy[Float]))Result.foreach { case (value, metric) => println(s”$metric is $value”) }

val testSet =

BytesToBGRImg() ->

BGRImgToBatch(16)

val model = Module.loadFloat

 

Result.foreach { case (value, metric) => println(s”$metric is $value”) }

  Top1Accuracy is Accuracy(correct: 1910, count: 6122, accuracy: 0.3119895459000327)Top5Accuracy is Accuracy(correct: 3283, count: 6122, accuracy: 0.5362626592616792)

Top5Accuracy is Accuracy(correct: 3283, count: 6122, accuracy: 0.5362626592616792)

If ever the model performance improves initially and then starts to flatten or decrease it might be a good idea to reduce the learning rate at that point while resuming training from where it left off. All one would need to do is use the model snapshot from the 15th epoch, which would be a minor change to the code above.

  val model = Module.loadFloat

And then specify a new set of hyperparameters:

  val optim = new SGD( learningRate = 0.00001, learningRateDecay = 0.0, weightDecay = 0.0001, momentum = 0.8, learningRateSchedule = SGD.EpochStep(20, 0.1), dampening = 0.0, nesterov = true)

learningRate = 0.00001,

weightDecay = 0.0001,

learningRateSchedule = SGD.EpochStep(20, 0.1),

nesterov = true

You can also choose to retrieve the existing optimization snapshot file:

  val optim = OptimMethod.loadFloat

Troubleshooting and Tuning Spark jobs When trying to run BigDL Spark applications we have generally encountered out-of-memory (OOM) errors. Some of these errors tend to occur on the driver side while the others on the executor side. For example, at times the job fails when trying to load the VGG16 model, and the Spark web UI logs provide no other details in this case. Increasing the driver memory option should resolve this and is controlled by the --driver-memory option while submitting the Spark application.

  17/07/25 15:46:16 INFO caffe.CaffeLoader: start loading caffe model from ./VGG_ILSVRC_16_layers.caffemodelException in thread “main” java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3181)

Exception in thread “main” java.lang.OutOfMemoryError: Java heap space

Later, when the training process tries to cache the model on the executors, it might fail during initialization. The stack trace as shown below points to the issue. Increasing the executor memory does the trick here and is controlled by the –executor-memory option while submitting the Spark application. The error trace may look like the following:

  17/07/25 21:04:02 INFO optim.DistriOptimizer$: caching training rdd …[Stage 4:>   (0 + 4) / 4]Exception in thread “dispatcher-event-loop-17” java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3236)

[Stage 4:>   (0 + 4) / 4]Exception in thread “dispatcher-event-loop-17” java.lang.OutOfMemoryError: Java heap space

Furthermore, at some point during the training iterations, like at the end of an epoch, one may have a trigger that would save the model at a specified checkpoint location. The job might fail, with messages about the YARN scheduler losing executors again. It might be caused when the application tries to fetch the current model from the executors to the driver or the driver trying to save the model (which is now >1 GB in size) at the specified checkpoint directory. Both these are also situations that result from running into out-of-memory can be fixed by either increasing the executor or the driver memory.

Conclusion

In this post we used Apache Spark and Intel’s BigDL to train a Convolutional Neural Network employing the fine-tuning transfer learning strategy. We found how well they work together and how this framework makes it easier for data scientists and analysts to continue to use their existing Apache Hadoop and Apache Spark platform as a unified data analytics platform. Intel’s BigDL is natively built on Spark, hence its vernacular Scala and Python API allows users to adopt it, making it well-suited for deep learning on Cloudera.

 


1https://deeplearning4j.org/distributed

2https://arxiv.org/pdf/1606.09282.pdf

3Very Deep Convolutional Networks for Large-Scale Image Recognition   K. Simonyan, A. Zisserman   arXiv:1409.1556