Introducing sparklyr, an R Interface for Apache Spark

Earlier this week, RStudio announced sparklyr, a new package that provides an interface between R and Apache Spark. We republish RStudio’s blog post below (see original) for your convenience.

Over the past couple of years we’ve heard time and time again that people want a native dplyr interface to Spark, so we built one! sparklyr also provides interfaces to Spark’s distributed machine learning algorithms and much more. Highlights include:

  • Interactively manipulate Spark data using both dplyr and SQL (via DBI).

  • Filter and aggregate Spark datasets then bring them into R for analysis and visualization.

  • Orchestrate distributed machine learning from R using either Spark MLlib or H2O SparkingWater.

  • Create extensions that call the full Spark API and provide interfaces to Spark packages.

  • Integrated support for establishing Spark connections and browsing Spark DataFrames within the RStudio IDE.

We’re also excited to be working with several industry partners. IBM is incorporating sparklyr into its Data Science Experience, Cloudera is working with us to ensure that sparklyr meets the requirements of its enterprise customers, and H2O has provided an integration between sparklyr and H2O Sparkling Water.

Getting Started

You can install sparklyr from CRAN as follows:

  install.packages(“sparklyr”)

You should also install a local version of Spark for development purposes:

  library(sparklyr) spark_install(version = “1.6.2”)

spark_install(version = “1.6.2”)

If you use the RStudio IDE, you should also download the latest preview release of the IDE, which includes several enhancements for interacting with Spark.

Extensive documentation and examples are available at http://spark.rstudio.com.

Connecting to Spark

You can connect to both local instances of Spark as well as remote Spark clusters. Here we’ll connect to a local instance of Spark:

  library(sparklyr) sc <- spark_connect(master = “local”)

sc <- spark_connect(master = “local”)

The returned Spark connection (sc) provides a remote dplyr data source to the Spark cluster.

Reading Data

You can copy R data frames into Spark using the dplyr copy_to function. (More typically, though, you’ll read data within the Spark cluster using the spark_read family of functions.) For the examples below, we’ll copy some datasets from R into Spark. (Note that you may need to install the nycflights13 and Lahman packages in order to execute this code.)

  library(dplyr) iris_tbl <- copy_to(sc, iris) flights_tbl <- copy_to(sc, nycflights13::flights, “flights”) batting_tbl <- copy_to(sc, Lahman::Batting, “batting”)

iris_tbl <- copy_to(sc, iris)

batting_tbl <- copy_to(sc, Lahman::Batting, “batting”)

Using dplyr We can now use all of the available dplyr verbs against the tables within the cluster. Here’s a simple filtering example:

  # filter by departure delay flights_tbl %>% filter(dep_delay == 2)

flights_tbl %>% filter(dep_delay == 2)

Introduction to dplyr provides additional dplyr examples you can try. For example, consider the last example from the tutorial which plots data on flight delays:

  delay <- flights_tbl %>%   group_by(tailnum) %>%  summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%  filter(count > 20, dist < 2000, !is.na(delay)) %>%  collect() # plot delayslibrary(ggplot2)ggplot(delay, aes(dist, delay)) +  geom_point(aes(size = count), alpha = 1/2) +  geom_smooth() +  scale_size_area(max_size = 2)

  group_by(tailnum) %>%

  filter(count > 20, dist < 2000, !is.na(delay)) %>%

 

library(ggplot2)

  geom_point(aes(size = count), alpha = 1/2) +

  scale_size_area(max_size = 2)

Note that while the dplyr functions shown above look identical to the ones you use with R data frames, with sparklyr they use Spark as their back end and execute remotely in the cluster.

Window Functions

dplyr window functions are also supported, for example:

  batting_tbl %>%     select(playerID, yearID, teamID, G, AB:H) %>%     arrange(playerID, yearID, teamID) %>%     group_by(playerID) %>%     filter(min_rank(desc(H)) <= 2 & H > 0)

  select(playerID, yearID, teamID, G, AB:H) %>%  

  group_by(playerID) %>%  

For additional documentation on using dplyr with Spark see the dplyr section of the sparklyr website.

Using SQL

It’s also possible to execute SQL queries directly against tables within a Spark cluster. The spark_connection object implements a DBI interface for Spark, so you can use dbGetQuery to execute SQL and return the result as an R data frame:

  library(DBI) iris_preview <- dbGetQuery(sc, “SELECT * FROM iris LIMIT 10”)

iris_preview <- dbGetQuery(sc, “SELECT * FROM iris LIMIT 10”)

Machine Learning You can orchestrate machine-learning algorithms in a Spark cluster via either Spark MLlib or via the H2O Sparkling Water extension package. Both provide a set of high-level APIs built on top of DataFrames that help you create and tune machine-learning workflows.

Spark MLlib

In this example we’ll use ml_linear_regression to fit a linear regression model. We’ll use the built-in mtcars dataset, and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt) and the number of cylinders the engine contains (cyl). We’ll assume in each case that the relationship between mpg and each of our features is linear.

  # copy mtcars into sparkmtcars_tbl <- copy_to(sc, mtcars) # transform our data set, and then partition into ‘training’, ‘test’partitions <- mtcars_tbl %>%  filter(hp >= 100) %>%  mutate(cyl8 = cyl == 8) %>%  sdf_partition(training = 0.5, test = 0.5, seed = 1099) # fit a linear model to the training datasetfit <- partitions$training %>%  ml_linear_regression(response = “mpg”, features = c(“wt”, “cyl”))

mtcars_tbl <- copy_to(sc, mtcars)

transform our data set, and then partition into ‘training’, ‘test’

  filter(hp >= 100) %>%

  sdf_partition(training = 0.5, test = 0.5, seed = 1099)

fit a linear model to the training dataset

  ml_linear_regression(response = “mpg”, features = c(“wt”, “cyl”))

For linear regression models produced by Spark, we can use summary() to learn a bit more about the quality of our fit, and the statistical significance of each of our predictors.

Spark machine learning supports a wide array of algorithms and feature transformations, and as illustrated above, it’s easy to chain these functions together with dplyr pipelines. To learn more, see the Spark MLlib section of the sparklyr website.

H2O Sparkling Water

Let’s walk the same mtcars example, but in this case use H2O’s machine-learning algorithms via the H2O Sparkling Water extension. The dplyr code used to prepare the data is the same, but after partitioning into test and training data we call h2o.glm rather than ml_linear_regression:

  # convert to h20_frame (uses the same underlying rdd)training <- as_h2o_frame(partitions$training)test <- as_h2o_frame(partitions$test) # fit a linear model to the training datasetfit <- h2o.glm(x = c(“wt”, “cyl”),               y = “mpg”,               training_frame = training,               lamda_search = TRUE) # inspect the modelprint(fit)

training <- as_h2o_frame(partitions$training)

 

fit <- h2o.glm(x = c(“wt”, “cyl”),

               training_frame = training,

 

print(fit)

For linear regression models produced by H2O, we can use either print() or summary() to learn a bit more about the quality of our fit. The summary() method returns some extra information about scoring history and variable importance.

To learn more, see the H2O Sparkling Water section of the sparklyr website.

Extensions

The facilities used internally by sparklyr for its dplyr and machine-learning interfaces are available to extension packages. Since Spark is a general-purpose cluster computing system, there are many potential applications for extensions (e.g. interfaces to custom machine-learning pipelines, interfaces to third-party Spark packages, etc.).

The sas7bdat extension enables parallel reading of SAS datasets in the sas7bdat format into Spark DataFrames. The rsparkling extension provides a bridge between sparklyr and H2O’s Sparkling Water.

We’re excited to see what other sparklyr extensions the R community creates. To learn more, see the Extensions section of the sparklyr website.

RStudio IDE

The latest RStudio Preview Release of the RStudio IDE includes integrated support for Spark and the sparklyr package, including tools for:

  • Creating and managing Spark connections

  • Browsing the tables and columns of Spark DataFrames

  • Previewing the first 1,000 rows of Spark DataFrames

Once you’ve installed the sparklyr package, you should find a new “Spark” pane within the IDE. This pane includes a “New Connection” dialog which can be used to make connections to local or remote Spark instances:

Once you’ve connected to Spark you’ll be able to browse the tables contained within the Spark cluster:

The Spark DataFrame preview uses the standard RStudio data viewer:

The RStudio IDE features for sparklyr are available now as part of the RStudio Preview Release. The final version of RStudio IDE that includes integrated support for sparklyr will ship within the next few weeks.