We posted several blog posts about sparklyr (introduction, automation), which enables you to analyze big data leveraging Apache Spark seamlessly with R. sparklyr, developed by RStudio, is an R interface to Spark that allows users to use Spark as the backend for dplyr, which is the popular data manipulation package for R.
If you are interested in sparklyr, you can learn how to use it with the official document, or you also can try it on a Spark cluster with Cloudera Director. In this blog post, we are going to build a Spark cluster on AWS with Cloudera Director. If you will try sparklyr, the official cheat sheet is very helpful.
In this post, we will show you the automated launch of sparklyr with Cloudera Director client and visualize and build a predictive model of US air flight.
You can refer to the previous post about automated deployment with Cloudera Director, but this time we will use this configuration file for Cloudera Director client in my repository. This cluster.conf requires Cloudera Director 2.3 or higher and it assumes you will use the ap-northeast-1 region on AWS. You should change several settings such as the security group and subnet ID.
You can launch a cluster with Cloudera Director as follows:
$ cloudera-director bootstrap cluster.conf |
If you have your own Cloudera Director server, you can deploy as follows:
$ cloudera-director bootstrap-remote cluster.conf –lp.remote.username= |
If you don’t have an environment with a Cloudera Director client, you can use a Docker-based tool.
In this post, we will show you a visualization and build a predictive model of US flights with sparklyr. Flight visualization code is based on this article: http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/
This post assumes you already have the following tables:
You should make these tables available through Apache Hive or Apache Impala (incubating) with Hue.
After installation of sparklyr and instantiation of the Spark cluster with Cloudera Director configuration file, you can access the RStudio server on
Connect to Spark with sparklyr
Let’s connect to your Spark cluster with sparklyr. In this post, we installed Spark 2.0 additionally. Before running the following code, you should install additional R packages as install.packages(c("ggplot2", "maps", "geosphere", "dplyr"))
.
library(ggplot2)library(maps)library(geosphere) library(sparklyr)library(dplyr) # Connect to the clusterconfig <- spark_config()config$spark.driver.cores <- 4config$spark.executor.cores <- 4config$spark.executor.memory <- “4G”spark_home <- “/opt/cloudera/parcels/SPARK2/lib/spark2”spark_version <- “2.0.0”sc <- spark_connect(master=”yarn-client”, version=spark_version, config=config, spark_home=spark_home) |
library(maps)
library(dplyr)
Connect to the cluster
config$spark.driver.cores <- 4
config$spark.executor.memory <- “4G”
spark_version <- “2.0.0”
Read the table from S3 and plot with ggplot Summarize flight number of airlines_bi_pq table by year.
airlines <- tbl(sc, “airlines_bi_pq”)airlines |
airlines
1234567891011121314151617181920212223 | ## Source: query [1.235e+08 x 30]## Database: spark connection master=yarn-client app=sparklyr local=FALSE## ## year month day dayofweek dep_time crs_dep_time arr_time crs_arr_time## |
2
4
6
8
10
12
14
16
18
20
22
Database: spark connection master=yarn-client app=sparklyr local=FALSE
year month day dayofweek dep_time crs_dep_time arr_time crs_arr_time
1 2006 6 3 6 840 830 1006 1007
3 2006 6 5 1 827 830 1004 1007
5 2006 6 7 3 831 830 1005 1007
7 2006 6 9 5 826 830 958 1007
9 2006 6 11 7 828 830 950 1007
# … with 1.235e+08 more rows, and 22 more variables: carrier ,
# crs_elapsed_time , airtime , arrdelay , depdelay ,
# taxi_out , cancelled , cancellation_code ,
# nas_delay , security_delay , late_aircraft_delay ,
airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collectairline_counts_by_year %>% tbl_df %>% print(n=nrow(.)) |
airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))
12345678910111213141516171819202122232425 | ## # A tibble: 22 × 2## year count## |
2
4
6
8
10
12
14
16
18
20
22
24
year count
1 1990 5270893
3 2007 7453215
5 1988 5202096
7 1994 5180048
9 1991 5076925
11 1989 5041200
13 1987 1311826
15 2001 5967780
17 2005 7140596
19 2008 7009728
21 2002 5271359
sparklyr’s table is evaluated lazily, so you should use collect to convert into a data.frame.
Plot summarized data with ggplot:
g <- ggplot(airline_counts_by_year, aes(x=year, y=count))g <- g + geom_line( colour = “magenta”, linetype = 1, size = 0.8)g <- g + xlab(“Year”)g <- g + ylab(“Flight number”)g <- g + ggtitle(“US flights”)plot(g) |
g <- g + geom_line(
linetype = 1,
)
g <- g + ylab(“Flight number”)
plot(g)
We found the decreasing of flight number in 2002. But why?
See flight number between 2001 and 2003
Next, to dig into the data in 2002, let’s plot the number of flights between 2001 and 2003.
airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect g <- ggplot( airline_counts_by_month, aes(x=as.Date(sprintf(“%d-%02d-01”, airline_counts_by_month$year, airline_counts_by_month$month)), y=count) )g <- g + geom_line( colour = “magenta”, linetype = 1, size = 0.8)g <- g + xlab(“Year/Month”)g <- g + ylab(“Flight number”)g <- g + ggtitle(“US flights”)plot(g) |
airline_counts_by_month,
)
colour = “magenta”,
size = 0.8
g <- g + xlab(“Year/Month”)
g <- g + ggtitle(“US flights”)
It appears that the number of flights after Sept. 2001 significantly decreased. We can understand it is the effect of 9/11. In this way, sparklyr makes exploratory data analysis easier for large-scale data, so we can obtain new insight quickly.
Summarize flight data by year, carrier, origin and dest
Next, we will summarize the data by carrier, origin and destination.
flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collectairports <- tbl(sc, “airports_new_pq”) %>% collect |
airports <- tbl(sc, “airports_new_pq”) %>% collect
Now we extract AA’s flight in 2007.
flights_aa <- flights %>% filter(year==2007) %>% filter(carrier==”AA”) %>% arrange(count)flights_aa |
flights_aa
## Source: local data frame [460 x 5]## Groups: year, carrier, origin [82]## ## year carrier origin dest count## |
Groups: year, carrier, origin [82]
year carrier origin dest count
1 2007 AA BOS JFK 1
3 2007 AA BOS SDF 1
5 2007 AA BNA IAD 1
7 2007 AA OGG ORD 8
9 2007 AA EGE LGA 17
# … with 450 more rows
Plotting flights into map.
Let’s plot the flight number of AA in 2007 on a map. You can change the condition of a filter to plot other airlines.
xlim <- c(-171.738281, -56.601563)ylim <- c(12.039321, 71.856229)pal <- colorRampPalette(c(“#333333”, “white”, “#1292db”))colors <- pal(100)map(“world”, col=”#6B6363”, fill=TRUE, bg=”#000000”, lwd=0.05, xlim=xlim, ylim=ylim) maxcnt <- max(flights_aa$count)for (j in 1:length(flights_aa$carrier)) { air1 <- airports[airports$iata == flights_aa[j,]$origin,] air2 <- airports[airports$iata == flights_aa[j,]$dest,] inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE) colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) ) lines(inter, col=colors[colindex], lwd=0.8)} |
ylim <- c(12.039321, 71.856229)
colors <- pal(100)
for (j in 1:length(flights_aa$carrier)) {
air2 <- airports[airports$iata == flights_aa[j,]$dest,]
inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)
}
Build a predictive model for delay with linear regression We will build a predictive model with Spark MLlib. We use linear regression from MLlib.
Build a predictive model for delay with linear regression
First, we will prepare training data. In order to handle categorical data, you should use ft_string_indexer for converting to label indices.
12345678910111213141516171819 | partitions <- airlines %>% filter(arrdelay >= 5) %>% sdf_mutate( carrier_cat = ft_string_indexer(carrier), origin_cat = ft_string_indexer(origin), dest_cat = ft_string_indexer(dest) ) %>% mutate(hour = floor(dep_time/100)) %>% sdf_partition(training = 0.5, test = 0.5, seed = 1099) fit <- partitions$training %>% ml_linear_regression( response = “arrdelay”, features = c( ”month”, “hour”, “dayofweek”, “carrier_cat”, “depdelay”, “origin_cat”, “dest_cat”, “distance” ) ) summary(fit) |
2
4
6
8
10
12
14
16
18
filter(arrdelay >= 5) %>%
carrier_cat = ft_string_indexer(carrier),
dest_cat = ft_string_indexer(dest)
mutate(hour = floor(dep_time/100)) %>%
ml_linear_regression(
features = c(
)
12345678910111213141516171819202122 | ## Call: ml_linear_regression(., response = “arrdelay”, features = c(“month”, “hour”, “dayofweek”, “carrier_cat”, “depdelay”, “origin_cat”, “dest_cat”, “distance”))## ## Deviance Residuals: (approximate):## Min 1Q Median 3Q Max ## -111.631 -7.860 -2.044 4.624 1434.686 ## ## Coefficients:## Estimate Std. Error t value Pr(> | t | ) ## (Intercept) 1.1779e+01 1.6486e-02 714.468 < 2.2e-16 ## month -4.6258e-02 9.8378e-04 -47.021 < 2.2e-16 **## hour -1.5079e-01 7.4880e-04 -201.373 < 2.2e-16 **## dayofweek -2.3504e-01 1.7646e-03 -133.195 < 2.2e-16 **## carrier_cat 1.4728e-01 7.2567e-04 202.951 < 2.2e-16 **## depdelay 9.0322e-01 8.7020e-05 10379.398 < 2.2e-16 **## origin_cat -5.7124e-03 9.8910e-05 -57.753 < 2.2e-16 **## dest_cat -2.3307e-02 9.4674e-05 -246.179 < 2.2e-16 **## distance 1.4307e-03 6.4905e-06 220.431 < 2.2e-16 **## —## Signif. codes: 0 ‘’ 0.001 ‘*’ 0.01 ‘’ 0.05 ‘.’ 0.1 ‘ ‘ 1## ## R-Squared: 0.8333## Root Mean Squared Error: 16.33 |
2
4
6
8
10
12
14
16
18
20
22
##
Min 1Q Median 3Q Max
##
Estimate Std. Error t value Pr(>|t|)
month -4.6258e-02 9.8378e-04 -47.021 < 2.2e-16 ***
dayofweek -2.3504e-01 1.7646e-03 -133.195 < 2.2e-16 ***
depdelay 9.0322e-01 8.7020e-05 10379.398 < 2.2e-16 ***
dest_cat -2.3307e-02 9.4674e-05 -246.179 < 2.2e-16 ***
—
##
Root Mean Squared Error: 16.33
Now, we can see the trained linear regression model and its coefficients.
Using sparklyr enables you to analyze big data on Amazon S3 with R smoothly. You can build a Spark cluster easily with Cloudera Director. sparklyr makes Spark as a backend database of dplyr. You can create tidy data from huge messy data, plot complex maps from this big data the same way as small data, and build a predictive model from big data with MLlib. I believe sparklyr helps all R users perform exploratory data analysis faster and easier on large-scale data. Let’s try!
You can see the Rmarkdown of this analysis on RPubs. With RStudio, you can share Rmarkdown easily on RPubs.Learn more about sparklyr and Cloudera in this on-demand video.