How-to: Do Scalable Graph Analytics with Apache Spark

Get started with scalable graph analysis via simple examples that utilize GraphFrames and Spark SQL on HDFS.

Graphs—also known as “networks”—are ubiquitous across web applications. As a refresher, a graph consists of nodes and edges. A node can be any object, such as a person or an airport, and an edge is a relation between two nodes, such as a friendship or an airline connection between two cities. Social networks and content networks (which comprise interlinked documents, such as web pages or citation networks) are other very common examples of a graph. Finally, the Internet of Things is basically a huge “graph of graphs”.

Why should one care about graph theory? Because the relations between things, or rather the links in a network, contain information that complements that about individual components. One common relevant use case here is* topology analysis*, from which one can derive information about graph properties. If you can track changes in topology, you can track changes across the entire system. Thus, topological analysis is a handy tool in multiple fields, including financial risk management and predictive maintenance.

In the remainder of this post, you will get some introductory hands-on exposure to graph analysis using the GraphFrames package in Apache Spark. As you’ll see, there are some powerful advantages to using GraphFrames and GraphX on HDFS in combination with Spark SQL. (Important note: Neither GraphX nor GraphFrames are considered production-ready, nor are they currently supported by Cloudera. This information presented here is for self-educational purposes with respect to graph analytics, only.)

Prerequisites

For both examples, you’ll need to install Cloudera QuickStart, which is available in VM and Docker versions. (CDH 5.5/Spark 1.5/GraphFrames 0.1.0 are used for the examples here, but the current versions—CDH 5.8/Spark 1.6/GraphFrames 0.2.0—should work just as well.) The GraphFrame package is available in the spark-packages repository. In addition to understanding the basic concepts of graph analysis, you’ll also need to know how to use RDDs and DataFrames in Spark.

Some code is skipped here for brevity, but complete exercise scripts, sample data, and the full tutorial as PDF file are available in this repo.

Exercise 1: Calculating PageRank using Scalable Graph Analysis

Google’s PageRank—perhaps the most famous graph-processing algorithm in Internet history—is calculated per node (or vertex). A web page’s PageRank is related to the importance of a node in the network and results from the topology (represented by links). In this example, we will use GraphFrames to calculate the PageRank for each node. Furthermore, calculating the clustering coefficient of a graph is based on the number of triplets and the number of fully connected triangles is possible, using GraphX.

First, download this data file from the SNAP website. This file can be downloaded, decompressed, and loaded into HDFS using this bootstrap script; or, you can do all that manually:

  $ wget https://snap.stanford.edu/data/facebook_combined.txt.gz$ gunzip *.gz$ hdfs dfs -put facebook_combined.txt

$ gunzip *.gz

Start a spark-shell with the optional packages to work with GraphFrame and the CSV processing libraries:

  $ spark-shell –packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0

Note that the \ symbol indicates that input continues on the next line. The --package option allows you to specify a comma separated list of Apache Maven coordinates for required third-party packages used in your spark-shell session. Those will automatically be loaded by the Spark cluster from a central repository. Here we need the GraphFrame package and a library for CSV parsing from Databricks.

Building the Graph

Next, we’ll define a DataFrame by loading data from a CSV file, which is stored in HDFS.

Our datafile facebook_combined.txt contains two columns to represent links between network nodes. The first column is called source (src), and the second is the destination (dst) of the link. (Some other systems, such as Gephi, use “source” and “target” instead.)

First we define a custom schema, and than we load the DataFrame, using SQLContext.

  scala> val customSchema = StructType(Array(StructField(“src”, IntegerType, true),StructField(“dst”, IntegerType, true)))scala> val df = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “false”).option(“delimiter”, “ “).schema(customSchema).load(“facebook_combined.txt”)

scala> val df = sqlContext.read.format(“com.databricks.spark.csv”).option(“header”, “false”).option(“delimiter”, “ “).schema(customSchema).load(“facebook_combined.txt”)

Here we use space (” “) as a specific delimiter. More options can be defined as shown in the documentation for the CSV parsing library.

In theory, this edge list contains all information to build a graph, but a GraphFrame needs nodes data and edges in separate DataFrames. Let’s transform the edge list to also get a node list.

First, we have to extract all unique node IDs into a column with name name and than we add a column with name id to it. Currently, there is no equivalent to a row_number() function available, but in version 1.6 the component called MonotonicallyIncreasingID.scala supports monotonically increasing 64-bit integers. For now, it is fine to use the node ID as row id as well.

  val n1 = edges.select(“src”).distinct()val n2 = edges.select(“dst”).distinct()val n = n1.unionAll(n2).withColumnRenamed(“src”,”name”).distinct()val nodes = n.withColumn(“id”, n(“name”))edges.show()nodes.show()val g1 = GraphFrame(nodes, edges)

val n2 = edges.select(“dst”).distinct()

val nodes = n.withColumn(“id”, n(“name”))

nodes.show()

All this work was preparation to get the graph data into the right shape by using DataFrames. Now, we are ready to analyze the graph.

Enter GraphFrames

There are multiple options for doing scalable graph analysis on top of HDFS. Currently, Apache Spark contains GraphX, but in the past, the deprecated Bagel project was used.

Bagel was a low-level implementation of Google’s Pregel graph-processing framework. According to the docs, “Bagel operates on a graph represented as a distributed dataset of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state.” Its core idea was that “In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.” (Apache Giraph implements the same concept.) Although this is a low-level abstraction, it was a useful starting point for implementation and development of new graph-analysis algorithms.

In contrast, the GraphX framework uses a property graph model, which is a popular high-level abstraction for representing a graph. In GraphX, the data is managed by RDDs – one for nodes and one for edges. The graph operations needen’t be implemented by a particular compute function, as in Bagel; instead, the GraphX library offers the graph class, which provides access to nodes, edges, and even to functionality to analyze the graph. The Pregel API is also available on this level in GraphX.

So, what is relationship between GraphFrames and GraphX? I personally like to answer that question with an analogy: What a DataFrame is to an RDD, a GraphFrame is to the graph class in GraphX. With GraphFrames, you can simply define a graph from your DataFrames. One holds the node list and the other the edge list. No matter how the data is stored in HDFS, DataFrames does all the preparation—like filtering grouping and projections—before the specific graph algorithms, implemented in the GraphFrame class, are applied. Results of such computations can be new columns in the node list. And because this is a DataFrame, we can easily join and finally export results from multiple operations (as demonstrated later).

Graph Analysis Made Easy

Next, let’s do some graph analysis on the Facebook ego-net graph.

We sort the nodes by degree and calculate the PageRank. Finally, we count the triangles in which a node is embedded.

  val k = g1.degrees.sort(desc(“degree”))k.show()val pr2 = g1.pageRank.resetProbability(0.15).maxIter(10).run()pr2.vertices.show()

k.show()

pr2.vertices.show()

Which are the nodes with the highest PageRank?

  val pr3 = pr2.vertices.sort(desc(“pagerank”))pr3.show()+—-+—-+——————+ name id pagerank +—-+—-+——————+ 3434 3434 18.184854992469376 1911 1911 18.123485211404812 2655 2655 17.525235604357597 1902 1902 17.35563914066005 1888 1888 13.34316669756043 +—-+—-+——————+only showing top 5 rows

pr3.show()

name id pagerank
3434 3434 18.184854992469376
2655 2655 17.525235604357597
1888 1888 13.34316669756043

only showing top 5 rows

Into how many triangles are our nodes embedded?

  val tcr = g1.triangleCount.run()

Because we plan to use this data later again, we can use the persist() function, but this makes only sense if the dataset is not too big.

  tcr.persist()tcr.sort(desc(“count”)).limit(4).show()+—–+—-+—-+ count name id +—–+—-+—-+ 30025 1912 1912 15502 2543 2543 15471 2233 2233 15213 2464 2464 +—–+—-+—-+only showing top 4 rows

tcr.sort(desc(“count”)).limit(4).show()

count name id
30025 1912 1912
15471 2233 2233

+—–+—-+—-+

Finally, we use these results to evaluate if the number of triangles is related to the PageRank. We join both results and export the table as a CSV file. This allows us to draw the scatter plot in gnuplot or R.

tcr is the triangle-count result; we did not persist it, so we have to calculate it again next time we do the join. The PageRank was calculated as a per-vertex value in the pr2.vertices DataFrame.

  scala> tcr.printSchemaroot – count: long (nullable = true) – name: integer (nullable = true) – id: integer (nullable = true)scala> pr2.vertices.printSchemaroot – name: integer (nullable = true) – id: integer (nullable = true) – pagerank: double (nullable = true)

root

– name: integer (nullable = true)

scala> pr2.vertices.printSchema

– name: integer (nullable = true)
– pagerank: double (nullable = true)

Since both datasets are DataFrames, we can simply use the DataFrame functionality to join both on the id column:

1234567891011121314151617 scala> val scatter = pr2.vertices.join(tcr, pr2.vertices.col(“id”).equalTo(tcr(“id”)))scala> scatter.limit(10).show16/05/10 03:24:53 INFO DAGScheduler: Job 18 finished: show at :51, took 653,309080 s+----+----+-------------------+-----+----+----+ name id pagerank count name id +—-+—-+——————-+—–+—-+—-+ 31 31 0.16137995810046757 110 31 31 231 231 0.35635621183428823 95 231 231 431 431 0.2495015120580725 1248 431 431 631 631 0.23890006729816274 41 631 631 831 831 0.3651072380090138 64 831 831 1031 1031 0.15013201313406596 5 1031 1031 1231 1231 0.2566120648129746 1452 1231 1231 1431 1431 0.44670540616673726 10441 1431 1431 1631 1631 0.285116150576581 35 1631 1631 1831 1831 2.1506290231917156 226 1831 1831 +—-+—-+——————-+—–+—-+—-+

2

4

6

8

10

12

14

16

scala> scatter.limit(10).show

+—-+—-+——————-+—–+—-+—-+

231 231 0.35635621183428823 95 231 231
631 631 0.23890006729816274 41 631 631
1031 1031 0.15013201313406596 5 1031 1031
1431 1431 0.44670540616673726 10441 1431 1431
1831 1831 2.1506290231917156 226 1831 1831

We export this result as a CSV file but to get only one result file, we use the coalesce function of the DataFrame:

  scatter.coalesce(1).write.format(“com.databricks.spark.csv”).option(“header”,”true”).save(“scatter.csv”)

Alternatively, we could also have used the getmerge command from HDFS to combine all the parts files into one single file—but in this case, it is important to use the option header=false, which turns the header lines off.

Now, let’s plot our previous result data set in gnuplot. It was already exported into a folder named scatter_1.csv but to HDFS. If gnuplot is not available, simply install it (for example, by typing sudo yum install gnuplot on CentOS). We have to load the result file to the local folder and then we use the following gnuplot commands stored in a script file named plot.cmd:

123456789101112131415161718192021 Filename: plot.cmdset datafile separator “,”set term pngset output “g1.png”## Add titles and labels.#set xlabel “PageRank”set ylabel “Triangles”set title “PR vs. T”unset key## Add grid lines.#set gridset size ratio -1## Timestamp the plot.#set timestampplot ‘scatter_1.csv/part-00000’ using 3:4

2

4

6

8

10

12

14

16

18

20

set datafile separator “,”

set output “g1.png”

Add titles and labels.

set xlabel “PageRank”

set title “PR vs. T”

#

set size ratio -1

Timestamp the plot.

set timestamp

We can easily plot the figure as a scatter plot (more about that below) assuming that the data set and the plot commands are in the same directory:

 

In a previous post, we used the scalaplot library to create charts. You can simply add the Maven coordinates during startup:

  $ spark-shell –packages graphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0,org.sameersingh.scalaplot:scalaplot:0.0.4

And now, those imports:

import org.sameersingh.scalaplot.Implicits._

allow you to plot charts in gnuplot or based on the JFreeChart library within the Spark shell.

Plotting the Results

This was a fun ride, starting with some public data, loading the edges, creating the node list, and defining a GraphFrame followed by analyzing the graph and merging partial results for the final plot. Finally, we can visualize the final results.

This scatter plot (see below) is not yet the ideal way to understand the data, but it’s a good starting point. A 2D histogram (example) would encode the number of (PageRank,Triangle) pairs, as well.

An alternative second approach to inspecting the data, a log-log plot (see below), would reveal a power-law distribution if a straight line were shown or a stretched power-low in the case of a bended curve. The relation between the PageRank of a node and the number of triangles of which a node is part is not a power law.

From the scatter plot, we can already conclude that a high number of triangles does not guarantee a high PageRank, and furthermore, that a high PageRank can be found for nodes that are embedded in a small number of triangles and also for nodes with more triangle embedding.

Now, let’s do the same exercise with a different data set.

Exercise 2: Topology Analysis of Web Crawl Results

To prepare the data set for this example, I used the Apache Nutch web crawler engine (which was designed by Doug Cutting and Mike Cafarella before they built Hadoop). Although data ingestion from SQL databases and huge document repositories dominate in commercial use cases, a data scientist should also be aware of the advantages of Nutch for scraping web data, in addition to those of Apache Flume and Apache Sqoop. For example, instead of using APIs or writing clients to load data from specific web pages, portals, or social media applications, you can use Nutch to get the data all at once and then apply specific parsing, such as HTML parsing with JSoup or more advanced triple extraction using Apache Any23.

Our crawl data is loaded into a staging table (in this case, in Apache HBase). Beside the full HTML content, it contains inlinks and outlinks. Because HBase is managing the crawl data (and the state of the crawl dataset), we can add more links, especially inlinks to a particular page, while we crawl more and more data. To distinguish both types of links in visualization, we use link type 1 for inlinks and link type 2 for outlinks. Finally, to analyze the overall network, we need both lists in one homogeneous collection, where each link is described by source, target, and type.

Using Spark SQL and DataFrames allows for inspection and rearrangement of the data in just a few steps. We start with a query that transforms the map of string pairs into individual rows. During the crawl procedure, Nutch aggregates all links in an adjacency list. We have to “explode” the data structure with all links per page to create a link list from the previously optimized representation.

  scala> val r1 = sqlContext.sql(“FROM yarn_ds1_run_3_webpage_parquet SELECT explode(inlinks) as (source,page), baseurl as target, ‘1’ as type”)r1.limit(4).show()+——————–+——————–+——————–+—-+ _c0 _c1 target type +——————–+——————–+——————–+—-+ https://creativec… Marking guide http://wiki.creat… 1 https://creativec… More info http://wiki.creat… 1 https://creativec… Learn more http://wiki.creat… 1 https://creativec… More info http://wiki.creat… 1 +——————–+——————–+——————–+—-+

r1.limit(4).show()

_c0 _c1 target type
https://creativec… Marking guide http://wiki.creat… 1
https://creativec… Learn more http://wiki.creat… 1

+——————–+——————–+——————–+—-+

As revealed above, an exploded map still lacks any useful column name. Let’s change that by defining the names for the two columns, represented by the string pair.

  val r1 = sqlContext.sql(“FROM yarn_ds1_run_3_webpage_parquet SELECT explode(inlinks) as (source,page), baseurl as target, ‘1’ as type”)

For our network, we need one column named source, which contains the URL and the page name. This is why we first explode the inlinks map into “virtual columns” called mt and mp. We can now concatenate both of them and create a new source column:

  val r1 = sqlContext.sql(“FROM yarn_ds1_run_3_webpage_parquet SELECT concat(mt,mp) as source, baseurl as target, ‘1’ as type, explode(inlinks) as (mt,mp)”)+——————–+——————–+—-+——————–+——————–+ source target type mt mp +——————–+——————–+—-+——————–+——————–+ https://creativec… http://wiki.creat… 1 https://creativec… Marking guide https://creativec… http://wiki.creat… 1 https://creativec… More info https://creativec… http://wiki.creat… 1 https://creativec… Learn more https://creativec… http://wiki.creat… 1 https://creativec… More info https://creativec… http://wiki.creat… 1 https://creativec… More info https://creativec… http://wiki.creat… 1 https://creativec… More info https://creativec… http://creativeco… 1 https://creativec… Creative Commons https://creativec… https://creativec… 1 https://creativec… https://creativec… https://creativec… https://creativec… 1 https://creativec… العربية https://creativec… https://creativec… 1 https://creativec… Беларуская +——————–+——————–+—-+——————–+——————–+

+——————–+——————–+—-+——————–+——————–+

https://creativec… http://wiki.creat… 1 https://creativec… More info
https://creativec… https://creativec… 1 https://creativec… https://creativec…
https://creativec… https://creativec… 1 https://creativec… Беларуская

Now we apply this procedure to the second set of links: our outlinks.

  val r2 = sqlContext.sql(“FROM yarn_ds1_run_3_webpage_parquet SELECT baseurl as source, concat(mt,mp) as target, ‘2’ as type, explode(outlinks) as (mt,mp)”)

To merge both link lists into one we can ignore the temporary columns. We simply select the required columns, use the unionAll operator, and store our page network as an Apache Parquet file in HDFS.

  val r1c = r1.select(“source”,”target”,”type”)val r2c = r2.select(“source”,”target”,”type”)val rAll = r1c.unionAll( r2c )rAll.writer.parquet( “full_link_list” )

val r2c = r2.select(“source”,”target”,”type”)

rAll.writer.parquet( “full_link_list” )

With this intermediate result, we can revisit GraphFrames with the variable g1 representing the graph loaded by Nutch. As homework, you can next apply the same analysis steps as in Exercise 1.

Conclusion

As you can see, Spark SQL provided access to many different data sources, no matter if we used the Apache Hive table in Parquet format or the HBase table via Hive. And for the many use cases in which the existing table layout cannot be used, Spark SQL made all the filtering, grouping, and projection really easy.

Using GraphFrames, it is possible to turn data tables into graphs with just a few lines of code. The full power of the Pregel API, implemented in GraphX, is available in combination with Spark SQL. As a result, raw data stored in Hadoop in different flavors can easily be combined into huge multi-layer graphs, with graph analysis all done in place via Spark.

Mirko Kämpf is a Solutions Architect at Cloudera.