How-to: Build a Machine-Learning App Using Sparkling Water and Apache Spark

Thanks to Michal Malohlava, Amy Wang, and Avni Wadhwa of H20.ai for providing the following guest post about building ML apps using Sparkling Water and Apache Spark on CDH.

The Sparkling Water project is nearing its one-year anniversary, which means Michal Malohlava, our main contributor, has been very busy for the better part of this past year. The Sparkling Water project combines H2O machine-learning algorithms with the execution power of Apache Spark. This means that the project is heavily dependent on two of the fastest growing machine-learning open source projects out there. With every major release of Spark or H2O there are API changes and, less frequently, major data structure changes that affect Sparkling Water. Throw Cloudera releases into the mix, and you have a plethora of git commits dedicated to maintaining a few simple calls to move data between the different platforms.

All that hard work on the backend means that users can easily benefit from programming in a uniform environment that combines both H2O and MLLib algorithms. For the data scientist using a Cloudera-supported distribution of Spark (Spark 1.3/CDH 5.4 as of this writing), they can easily incorporate an H2O library into their Spark application. An entry point to the H2O programming world (called H2OContext) is created and allows for the launch of H2O, parallel import of frames into memory and the use of H2O algorithms. This seamless integration into Spark makes launching a Sparkling Water application as easy as launching a Spark application:

  > bin/spark-submit –class water.YourSparklingWaterApp –master yarn-client sparkling-water-app-assembly.jar

Setup and Installation Sparkling Water is certified on Cloudera and certified to work with versions of Spark installations that come prepackaged with each distribution. To install Sparkling Water, navigate to h2o.ai/download and download the version corresponding to the version of Spark available with your Cloudera cluster. Rather than downloading Spark and then distributing on the Cloudera cluster manually, simply set your SPARK_HOME to the spark directory in your opt directory:

  $ export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark

For ease of use, we are looking into taking advantage of Cloudera Manager and creating distributable H2O and Sparkling Water parcels. This will simplify the management of the various versions of Cloudera, Spark, and H2O.

Architecture

Figure 1 illustrates the concept of technical realization. The application developer implements a Spark application using the Spark API and Sparkling Water library. After submitting the resulting Sparkling Water application into a Spark cluster, the application can create H2OContext, which initializes H2O services on top of Spark nodes. The application can then use any functionality provided by H2O, including its algorithms and interactive UI. H2O uses its own data structure called H2OFrame to represent tabular data, but H2OContext allows H2O to share data with Spark’s RDDs.

Figure 1: Sparkling Water architecture

Figure 2 illustrates the launch sequence of Sparkling Water on a Cloudera cluster. Both Spark and H2O are in-memory processes and all computation occurs in memory with minimal writing to disk, occurring exclusively when specified by the user. Because all the data used in the modeling process needs to read into memory, the recommended method of launching Spark and H2O is through YARN, which dynamically allocates available resources. When the job is finished, you can tear down the Sparkling Water cluster and free up resources for other jobs. All Spark and Sparkling Water applications launched with YARN will be tracked and listed in the history server that you can launch on Cloudera Manager.

YARN will allocate the container to launch the application master in and when you launch with yarn-client, the spark driver runs in the client process and the application master submits a request to the resource manager to spawn the Spark Executor JVMs. Finally, after creating a Sparkling Water cluster, you have access to HDFS to read data into either H2O or Spark.

Figure 2: Sparkling Water on Cloudera [Launching on YARN]

Programming Model

The H2OContext exposes two operators for: (1) publishing Spark RDD as H2O Frame (2) publishing H2O Frame as Spark RDD. The direction from Spark to H2O makes sense when data are prepared with the help of Spark API and passed to H2O algorithms:

  // …val srdd: SchemaRDD = sqlContext.sql(“SELECT * FROM ChicagoCrimeTable where Arrest = ‘true’”)// Publish the RDD as H2OFrameval h2oFrame: H2OFrame = h2oContext.asH2OFrame(srdd)// …val dlModel: DeepLearningModel = new DeepLearning().trainModel.get…

val srdd: SchemaRDD = sqlContext.sql(“SELECT * FROM ChicagoCrimeTable where Arrest = ‘true’”)

val h2oFrame: H2OFrame = h2oContext.asH2OFrame(srdd)

val dlModel: DeepLearningModel = new DeepLearning().trainModel.get

The opposite direction from H2O Frame to Spark RDD is used in a situation when the user needs to expose H2O’s frames as Spark’s RDDs. For example:

  val prediction: H2OFrame = dlModel.score(testFrame)// …// Exposes prediction frame as RDDval srdd: SchemaRDD = asSchemaRDD(prediction)

// …

val srdd: SchemaRDD = asSchemaRDD(prediction)

The H2O context simplifies the programming model by introducing implicit conversion to hide asSchemaRDD and asH2OFrame calls.

Sparkling Water excels in situations when you need to call advanced machine-learning algorithms from an existing Spark workflow. Furthermore, we found that it is the perfect platform for designing and developing smarter machine learning applications. In the rest of this post, we will demonstrate how to use Sparkling Water to create a simple machine-learning application that predicts arrest probability for a given crime in Chicago. (Although this app is tested on Spark 1.4, it should work on 1.3, the version inside CDH 5.4, as well without mods.)

Example Application

We’ve seen some incredible applications of Deep Learning with respect to image recognition and machine translation but this specific use case has to do with public safety; in particular, how Deep Learning can be used to fight crime in the forward-thinking cities of San Francisco and Chicago. The cool thing about these two cities (and many others!) is that they are both open data cities, which means anybody can access city data ranging from transportation information to building maintenance records. So if you are a data scientist or thinking about becoming a data scientist, there are publicly available city-specific datasets you can play with. For this example, we looked at the historical crime data from both Chicago and San Francisco and joined this data with other external data, such as weather and socioeconomic factors, using Spark’s SQL context:

Figure 3: Spark + H2O Workflow

We perform the data import, ad-hoc data munging (parsing the date column, for example), and joining of tables by leveraging the power of Spark. We then publish the Spark RDD as an H2O Frame (Fig. 2).

123456789101112131415161718192021222324252627 val sc: SparkContext = // …implicit val sqlContext = new SQLContext(sc)implicit val h2oContext = new H2OContext(sc).start()import h2oContext._ val weatherTable = asSchemaRDD(createWeatherTable(“hdfs://data/chicagoAllWeather.csv”))registerRDDAsTable(weatherTable, “chicagoWeather”)// Census dataval censusTable = asSchemaRDD(createCensusTable(“hdfds://data/chicagoCensus.csv”))registerRDDAsTable(censusTable, “chicagoCensus”)// Crime dataval crimeTable  = asSchemaRDD(createCrimeTable(“hdfs://data/chicagoCrimes10k.csv”, “MM/dd/yyyy hh:mm:ss a”, “Etc/UTC”))registerRDDAsTable(crimeTable, “chicagoCrime”) val crimeWeather = sql(“"”SELECT a.Year, …, b.meanTemp, …, c.PER_CAPITA_INCOME    FROM chicagoCrime a    JOIN chicagoWeather b    ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day    JOIN chicagoCensus c    ON a.Community_Area = c.Community_Area_Number””“.stripMargin) // Publish result as H2O Frameval crimeWeatherHF: H2OFrame = crimeWeather // Split data into train and test datasetsval frs = splitFrame(crimeWeatherHF, Array(“train.hex”, “test.hex”), Array(0.8, 0.2))val (train, test) = (frs(0), frs(1))

2

4

6

8

10

12

14

16

18

20

22

24

26

implicit val sqlContext = new SQLContext(sc)

import h2oContext._

val weatherTable = asSchemaRDD(createWeatherTable(“hdfs://data/chicagoAllWeather.csv”))

// Census data

registerRDDAsTable(censusTable, “chicagoCensus”)

val crimeTable  = asSchemaRDD(createCrimeTable(“hdfs://data/chicagoCrimes10k.csv”, “MM/dd/yyyy hh:mm:ss a”, “Etc/UTC”))

 

    FROM chicagoCrime a
    ON a.Year = b.year AND a.Month = b.month AND a.Day = b.day
    ON a.Community_Area = c.Community_Area_Number””“.stripMargin)

// Publish result as H2O Frame

 

val frs = splitFrame(crimeWeatherHF, Array(“train.hex”, “test.hex”), Array(0.8, 0.2))

Figures 4 and 5 below include some cool visualizations we made of the joined table using H2O’s Flow as part of Sparkling Water.

Figure 4: San Francisco crime visualizations

Figure 5: Chicago crime visualizations

Interesting how in both cities’ crime seems to occur most frequently during the winter—a surprising fact given how cold the weather gets in Chicago!

Using H2O Flow, we were able to look at the arrest rates of every category of recorded crimes in Chicago and compare them with the percentage of total crimes each category represents. Some crimes with the highest arrest rates also occur least frequently, and vice versa.

Figure 6: Chicago arrest rates and total % of all crimes by category

Once the data is transformed to an H2O Frame, we train a deep neural network to predict the likelihood of an arrest for a given crime.

12345678910111213141516171819202122232425262728 def DLModel(train: H2OFrame, test: H2OFrame, response: String,            epochs: Int = 10, l1: Double = 0.0001, l2: Double = 0.0001,            activation: Activation = Activation.RectifierWithDropout, hidden:Array[Int] = Array(200,200))           (implicit h2oContext: H2OContext) : DeepLearningModel = {  import h2oContext._  import hex.deeplearning.DeepLearning  import hex.deeplearning.DeepLearningModel.DeepLearningParameters   val dlParams = new DeepLearningParameters()  dlParams._train = train  dlParams._valid = test  dlParams._response_column = response  dlParams._epochs = epochs  dlParams._l1 = l1  dlParams._l2 = l2  dlParams._activation = activation  dlParams._hidden = hidden   // Create a job  val dl = new DeepLearning(dlParams)  val model = dl.trainModel.get  model} // Build Deep Learning modelval dlModel = DLModel(train, test, ‘Arrest)// Collect model performance metrics and predictions for test dataval (trainMetricsDL, testMetricsDL) = binomialMetrics(dlModel, train, test)

2

4

6

8

10

12

14

16

18

20

22

24

26

28

            epochs: Int = 10, l1: Double = 0.0001, l2: Double = 0.0001,

           (implicit h2oContext: H2OContext) : DeepLearningModel = {

  import hex.deeplearning.DeepLearning

 

  dlParams._train = train

  dlParams._response_column = response

  dlParams._l1 = l1

  dlParams._activation = activation

 

  val dl = new DeepLearning(dlParams)

  model

 

val dlModel = DLModel(train, test, ‘Arrest)

val (trainMetricsDL, testMetricsDL) = binomialMetrics(dlModel, train, test)

Here is a screenshot of our H2O Deep Learning model being tuned inside Flow and the resulting AUC curve from scoring the trained model against the validation dataset.

Figure 7: Chicago validation data AUC

The last building block of the application is formed by a function which predicts the arrest rate probability for a new crime. The function combines the Spark API to enrich each incoming crime event with census information and H2O’s deep learning model, which scores the event:

  def scoreEvent(crime: Crime, model: Model[,,], censusTable: SchemaRDD)              (implicit sqlContext: SQLContext, h2oContext: H2OContext): Float = {  import h2oContext.  import sqlContext._  // Create a single row table  val srdd:SchemaRDD = sqlContext.sparkContext.parallelize(Seq(crime))  // Join table with census data  val row: DataFrame = censusTable.join(srdd, on = Option(‘Community_Area === ‘Community_Area_Number)) //.printSchema  val predictTable = model.score(row)  val probOfArrest = predictTable.vec(“true”).at(0)   probOfArrest.toFloat} val crimeEvent = Crime(“02/08/2015 11:43:58 PM”, 1811, “NARCOTICS”, “STREET”,false, 422, 4, 7, 46, 18)val arrestProbability = 100 * scoreEvent(crime, dlModel, censusTable)

              (implicit sqlContext: SQLContext, h2oContext: H2OContext): Float = {

  import sqlContext._

  val srdd:SchemaRDD = sqlContext.sparkContext.parallelize(Seq(crime))

  val row: DataFrame = censusTable.join(srdd, on = Option(‘Community_Area === ‘Community_Area_Number)) //.printSchema

  val probOfArrest = predictTable.vec(“true”).at(0)

  probOfArrest.toFloat

 

val arrestProbability = 100 * scoreEvent(crime, dlModel, censusTable)

Figure 8: Geo-mapped predictions

Because each of the crimes reported comes with latitude-longitude coordinates, we scored our hold out data using the trained model and plotted the predictions on a map of Chicago—specifically, the Downtown district. The color coding corresponds to the model’s prediction for likelihood of an arrest with red being very likely (X > 0.8) and blue being unlikely (X < 0.2). Smart analytics + resource management = safer streets.

Further Reading

If you’re interested in finding out more about Sparkling Water or H2O please join us at H2O World 2015 in Mountain View, CA. We’ll have a series of great speakers including Stanford Professors Rob Tibshirani and Stephen Boyd, Hilary Mason, the Founder of Fast Forward Labs, Erik Huddleston, the CEO of TrendKite, Danqing Zhao, Big Data Director for Macy’s and Monica Rogati, Equity Partner at Data Collective.