Analyzing US flight data on Amazon S3 with sparklyr and Apache Spark 2.0

We posted several blog posts about sparklyr (introduction, automation), which enables you to analyze big data leveraging Apache Spark seamlessly with R. sparklyr, developed by RStudio, is an R interface to Spark that allows users to use Spark as the backend for dplyr, which is the popular data manipulation package for R.

If you are interested in sparklyr, you can learn how to use it with the official document, or you also can try it on a Spark cluster with Cloudera Director. In this blog post, we are going to build a Spark cluster on AWS with Cloudera Director. If you will try sparklyr, the official cheat sheet is very helpful.

In this post, we will show you the automated launch of sparklyr with Cloudera Director client and visualize and build a predictive model of US air flight.

You can refer to the previous post about automated deployment with Cloudera Director, but this time we will use this configuration file for Cloudera Director client in my repository. This cluster.conf requires Cloudera Director 2.3 or higher and it assumes you will use the ap-northeast-1 region on AWS. You should change several settings such as the security group and subnet ID.

You can launch a cluster with Cloudera Director as follows:

  $ cloudera-director bootstrap cluster.conf

If you have your own Cloudera Director server, you can deploy as follows:

  $ cloudera-director bootstrap-remote cluster.conf –lp.remote.username= –lp.remote.password= –lp.remote.hostAndPort=:7189

If you don’t have an environment with a Cloudera Director client, you can use a Docker-based tool.

In this post, we will show you a visualization and build a predictive model of US flights with sparklyr. Flight visualization code is based on this article: http://flowingdata.com/2011/05/11/how-to-map-connections-with-great-circles/

This post assumes you already have the following tables:

You should make these tables available through Apache Hive or Apache Impala (incubating) with Hue.

After installation of  sparklyr and instantiation of the Spark cluster with Cloudera Director configuration file, you can access the RStudio server on :8787 with your browser and log in with rsuser/cloudera.

Connect to Spark with sparklyr

Let’s connect to your Spark cluster with sparklyr. In this post, we installed Spark 2.0 additionally. Before running the following code, you should install additional R packages as install.packages(c("ggplot2", "maps", "geosphere", "dplyr")).

  library(ggplot2)library(maps)library(geosphere) library(sparklyr)library(dplyr) # Connect to the clusterconfig <- spark_config()config$spark.driver.cores   <- 4config$spark.executor.cores <- 4config$spark.executor.memory <- “4G”spark_home <- “/opt/cloudera/parcels/SPARK2/lib/spark2”spark_version <- “2.0.0”sc <- spark_connect(master=”yarn-client”, version=spark_version, config=config, spark_home=spark_home)

library(maps)

 

library(dplyr)

Connect to the cluster

config$spark.driver.cores   <- 4

config$spark.executor.memory <- “4G”

spark_version <- “2.0.0”

Read the table from S3 and plot with ggplot Summarize flight number of airlines_bi_pq table by year.

  airlines <- tbl(sc, “airlines_bi_pq”)airlines

airlines

1234567891011121314151617181920212223 ## Source:   query [1.235e+08 x 30]## Database: spark connection master=yarn-client app=sparklyr local=FALSE## ##     year month   day dayofweek dep_time crs_dep_time arr_time crs_arr_time##                                 ## 1   2006     6     3         6      840          830     1006         1007## 2   2006     6     4         7      830          830      958         1007## 3   2006     6     5         1      827          830     1004         1007## 4   2006     6     6         2      830          830     1006         1007## 5   2006     6     7         3      831          830     1005         1007## 6   2006     6     8         4      826          830      958         1007## 7   2006     6     9         5      826          830      958         1007## 8   2006     6    10         6      845          830     1011         1007## 9   2006     6    11         7      828          830      950         1007## 10  2006     6    12         1      829          830      956         1007## # ... with 1.235e+08 more rows, and 22 more variables: carrier ,## #   flight_num , tail_num , actual_elapsed_time ,## #   crs_elapsed_time , airtime , arrdelay , depdelay ,## #   origin , dest , distance , taxi_in ,## #   taxi_out , cancelled , cancellation_code ,## #   diverted , carrier_delay , weather_delay ,## #   nas_delay , security_delay , late_aircraft_delay ,## #   date_yyyymm

2

4

6

8

10

12

14

16

18

20

22

Database: spark connection master=yarn-client app=sparklyr local=FALSE

    year month   day dayofweek dep_time crs_dep_time arr_time crs_arr_time

1   2006     6     3         6      840          830     1006         1007

3   2006     6     5         1      827          830     1004         1007

5   2006     6     7         3      831          830     1005         1007

7   2006     6     9         5      826          830      958         1007

9   2006     6    11         7      828          830      950         1007

# … with 1.235e+08 more rows, and 22 more variables: carrier ,

#   crs_elapsed_time , airtime , arrdelay , depdelay ,

#   taxi_out , cancelled , cancellation_code ,

#   nas_delay , security_delay , late_aircraft_delay ,

 

  airline_counts_by_year <- airlines %>% group_by(year) %>% summarise(count=n()) %>% collectairline_counts_by_year %>% tbl_df %>% print(n=nrow(.))

airline_counts_by_year %>% tbl_df %>% print(n=nrow(.))

12345678910111213141516171819202122232425 ## # A tibble: 22 × 2##     year   count##       ## 1   1990 5270893## 2   2003 6488540## 3   2007 7453215## 4   2006 7141922## 5   1988 5202096## 6   1997 5411843## 7   1994 5180048## 8   2004 7129270## 9   1991 5076925## 10  1996 5351983## 11  1989 5041200## 12  1998 5384721## 13  1987 1311826## 14  1995 5327435## 15  2001 5967780## 16  1992 5092157## 17  2005 7140596## 18  2000 5683047## 19  2008 7009728## 20  1999 5527884## 21  2002 5271359## 22  1993 5070501

2

4

6

8

10

12

14

16

18

20

22

24

    year   count

1   1990 5270893

3   2007 7453215

5   1988 5202096

7   1994 5180048

9   1991 5076925

11  1989 5041200

13  1987 1311826

15  2001 5967780

17  2005 7140596

19  2008 7009728

21  2002 5271359

 

sparklyr’s table is evaluated lazily, so you should use collect to convert into a data.frame.

Plot summarized data with ggplot:

  g <- ggplot(airline_counts_by_year, aes(x=year, y=count))g <- g + geom_line( colour = “magenta”, linetype = 1, size = 0.8)g <- g + xlab(“Year”)g <- g + ylab(“Flight number”)g <- g + ggtitle(“US flights”)plot(g)

g <- g + geom_line(

 linetype = 1,

)

g <- g + ylab(“Flight number”)

plot(g)

We found the decreasing of flight number in 2002. But why?

See flight number between 2001 and 2003

Next, to dig into the data in 2002, let’s plot the number of flights between 2001 and 2003.

  airline_counts_by_month <- airlines %>% filter(year>= 2001 & year<=2003) %>% group_by(year, month) %>% summarise(count=n()) %>% collect g <- ggplot( airline_counts_by_month,  aes(x=as.Date(sprintf(“%d-%02d-01”, airline_counts_by_month$year, airline_counts_by_month$month)), y=count) )g <- g + geom_line( colour = “magenta”, linetype = 1, size = 0.8)g <- g + xlab(“Year/Month”)g <- g + ylab(“Flight number”)g <- g + ggtitle(“US flights”)plot(g)

 

 airline_counts_by_month,

 )

 colour = “magenta”,

 size = 0.8

g <- g + xlab(“Year/Month”)

g <- g + ggtitle(“US flights”)

It appears that the number of flights after Sept. 2001 significantly decreased. We can understand it is the effect of 9/11. In this way, sparklyr makes exploratory data analysis easier for large-scale data, so we can obtain new insight quickly.

Summarize flight data by year, carrier, origin and dest

Next, we will summarize the data by carrier, origin and destination.

  flights <- airlines %>% group_by(year, carrier, origin, dest) %>% summarise(count=n()) %>% collectairports <- tbl(sc, “airports_new_pq”) %>% collect

airports <- tbl(sc, “airports_new_pq”) %>% collect

Now we extract AA’s flight in 2007.

  flights_aa <- flights %>% filter(year==2007) %>% filter(carrier==”AA”) %>% arrange(count)flights_aa

flights_aa

  ## Source: local data frame [460 x 5]## Groups: year, carrier, origin [82]## ##     year carrier origin  dest count##          ## 1   2007      AA    BOS   JFK     1## 2   2007      AA    MIA   FLL     1## 3   2007      AA    BOS   SDF     1## 4   2007      AA    MIA   XNA     1## 5   2007      AA    BNA   IAD     1## 6   2007      AA    XNA   MIA     2## 7   2007      AA    OGG   ORD     8## 8   2007      AA    ORD   OGG     8## 9   2007      AA    EGE   LGA    17## 10  2007      AA    MTJ   ORD    17## # ... with 450 more rows

Groups: year, carrier, origin [82]

    year carrier origin  dest count

1   2007      AA    BOS   JFK     1

3   2007      AA    BOS   SDF     1

5   2007      AA    BNA   IAD     1

7   2007      AA    OGG   ORD     8

9   2007      AA    EGE   LGA    17

# … with 450 more rows

 

Plotting flights into map.

Let’s plot the flight number of AA in 2007 on a map. You can change the condition of a filter to plot other airlines.

  xlim <- c(-171.738281, -56.601563)ylim <- c(12.039321, 71.856229)pal <- colorRampPalette(c(“#333333”, “white”, “#1292db”))colors <- pal(100)map(“world”, col=”#6B6363”, fill=TRUE, bg=”#000000”, lwd=0.05, xlim=xlim, ylim=ylim) maxcnt <- max(flights_aa$count)for (j in 1:length(flights_aa$carrier)) { air1 <- airports[airports$iata == flights_aa[j,]$origin,] air2 <- airports[airports$iata == flights_aa[j,]$dest,]  inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE) colindex <- round( (flights_aa[j,]$count / maxcnt) * length(colors) )  lines(inter, col=colors[colindex], lwd=0.8)}

ylim <- c(12.039321, 71.856229)

colors <- pal(100)

 

for (j in 1:length(flights_aa$carrier)) {

 air2 <- airports[airports$iata == flights_aa[j,]$dest,]

 inter <- gcIntermediate(c(air1[1,]$longitude, air1[1,]$latitude), c(air2[1,]$longitude, air2[1,]$latitude), n=100, addStartEnd=TRUE)

 

}

Build a predictive model for delay with linear regression We will build a predictive model with Spark MLlib. We use linear regression from MLlib.

Build a predictive model for delay with linear regression

First, we will prepare training data. In order to handle categorical data, you should use ft_string_indexer for converting to label indices.

12345678910111213141516171819 partitions <- airlines %>% filter(arrdelay >= 5) %>% sdf_mutate(      carrier_cat = ft_string_indexer(carrier),      origin_cat = ft_string_indexer(origin),      dest_cat = ft_string_indexer(dest) ) %>% mutate(hour = floor(dep_time/100)) %>% sdf_partition(training = 0.5, test = 0.5, seed = 1099) fit <- partitions$training %>%  ml_linear_regression(    response = “arrdelay”,    features = c(       ”month”, “hour”, “dayofweek”, “carrier_cat”, “depdelay”, “origin_cat”, “dest_cat”, “distance”      )   ) summary(fit)

2

4

6

8

10

12

14

16

18

 filter(arrdelay >= 5) %>%

      carrier_cat = ft_string_indexer(carrier),

      dest_cat = ft_string_indexer(dest)

 mutate(hour = floor(dep_time/100)) %>%

 

  ml_linear_regression(

    features = c(

      )

 

12345678910111213141516171819202122 ## Call: ml_linear_regression(., response = “arrdelay”, features = c(“month”, “hour”, “dayofweek”, “carrier_cat”, “depdelay”, “origin_cat”, “dest_cat”, “distance”))## ## Deviance Residuals: (approximate):##      Min       1Q   Median       3Q      Max ## -111.631   -7.860   -2.044    4.624 1434.686 ## ## Coefficients:##                Estimate  Std. Error   t value  Pr(> t )    ## (Intercept)  1.1779e+01  1.6486e-02   714.468 < 2.2e-16 ## month       -4.6258e-02  9.8378e-04   -47.021 < 2.2e-16 **## hour        -1.5079e-01  7.4880e-04  -201.373 < 2.2e-16 **## dayofweek   -2.3504e-01  1.7646e-03  -133.195 < 2.2e-16 **## carrier_cat  1.4728e-01  7.2567e-04   202.951 < 2.2e-16 **## depdelay     9.0322e-01  8.7020e-05 10379.398 < 2.2e-16 **## origin_cat  -5.7124e-03  9.8910e-05   -57.753 < 2.2e-16 **## dest_cat    -2.3307e-02  9.4674e-05  -246.179 < 2.2e-16 **## distance     1.4307e-03  6.4905e-06   220.431 < 2.2e-16 **## —## Signif. codes:  0 ‘’ 0.001 ‘*’ 0.01 ‘’ 0.05 ‘.’ 0.1 ‘ ‘ 1## ## R-Squared: 0.8333## Root Mean Squared Error: 16.33

2

4

6

8

10

12

14

16

18

20

22

##

     Min       1Q   Median       3Q      Max

##

               Estimate  Std. Error   t value  Pr(>|t|)    

month       -4.6258e-02  9.8378e-04   -47.021 < 2.2e-16 ***

dayofweek   -2.3504e-01  1.7646e-03  -133.195 < 2.2e-16 ***

depdelay     9.0322e-01  8.7020e-05 10379.398 < 2.2e-16 ***

dest_cat    -2.3307e-02  9.4674e-05  -246.179 < 2.2e-16 ***

##

Root Mean Squared Error: 16.33

 

Now, we can see the trained linear regression model and its coefficients.

Using sparklyr enables you to analyze big data on Amazon S3 with R smoothly. You can build a Spark cluster easily with Cloudera Director. sparklyr makes Spark as a backend database of dplyr. You can create tidy data from huge messy data, plot complex maps from this big data the same way as small data, and build a predictive model from big data with MLlib. I believe sparklyr helps all R users perform exploratory data analysis faster and easier on large-scale data. Let’s try!

You can see the Rmarkdown of this analysis on RPubs. With RStudio, you can share Rmarkdown easily on RPubs.Learn more about sparklyr and Cloudera in this on-demand video.