Users of the latest release of the Genome Analysis Toolkit, an open source framework for analyzing high-throughput DNA sequencing data, can now choose Apache Spark for data processing.
Ever since the Human Genome Project produced the first draft sequence of the human genome in 2000, the cost of sequencing has dropped exponentially, from around US$100 million per genome then to around US$1,000 today. Over the same period, we have seen massive growth in the storage and processing capabilities of big data technologies like Apache Hadoop. It’s very fitting, then, to use tools from the Hadoop ecosystem for genomics, which is why Cloudera, in cooperation with the Broad Institute and other industry partners, is pleased to announce the alpha release of the Genome Analysis Toolkit (GATK) version 4 running on Apache Spark.
Data processing for genomics, just like data processing for any industry, is about running data pipelines. A pipeline for DNA sequencing is shown in Figure 1.
Figure 1. DNA sequencing pipeline
The pipeline starts with samples of DNA; these are sequenced by a machine, and out comes a file containing fragments of DNA sequences (made up of the letters A, C, G, and T). The raw sequence data is not very useful at this point, since the sequence fragments do not have a position in the genome. So a piece of software called an aligner is used to align the fragments with the reference genome. (The reference is a product of the Human Genome Project.) At this stage the DNA sequence for the individual is known, but the next question is usually: “How does this sequence differ from other individuals?” This step in the pipeline amounts to finding variants in the sequence compared to the reference: conceptually, you can think of it as running Unix diff on the sequence and the reference. The output is a set of variant calls for the individual. The pipeline in Figure 1 stops at this point, but in practice, the variant call data is the raw material for downstream analysis by researchers.
The Genome Analysis Toolkit (GATK) covers the variant discovery part of the pipeline. Variant discovery is itself comprises a number of steps, and the GATK provides tools for running each of these steps. (The steps are described in the GATK Best Practices document.) What’s new in GATK version 4 is that Spark implementations of these tools are now provided, which will allow the variant discovery part of the pipeline to be run at scale on a Spark cluster. To understand what that involves, let’s look at the GATK in a little more detail.
Inside the GATK
The GATK has the concept of a Walker, which is essentially a callback iterator API. A ReadWalker
, for example, iterates over all the reads in a genome, and invokes a method on the callback interface for each read. (A read is a short snippet of DNA that has been aligned with the reference genome.) The most attractive thing about the Walker API is that it makes it easy for users to write their own tools. One of the simplest programs is to count the number of reads in a file, which is shown here:
public final class CountReads extends ReadWalker { private long count = 0; @Override public void apply(GATKRead read, ReferenceContext referenceContext, FeatureContext featureContext) { ++count; } @Override public Object onTraversalDone() { return count; } } |
private long count = 0;
public void apply(GATKRead read, ReferenceContext referenceContext, FeatureContext featureContext) {
}
public Object onTraversalDone() {
}
The apply()
method simply increments a counter; then once the end of the file is reached onTraversalDone()
is called to report the count. CountReads
is a serial program, so only one process is used to count the reads in a file (or even in multiple files). It has the advantage of simplicity, but the disadvantage of being slow, something that is significant when processing terabytes of data.
The Spark equivalent is structured differently, since instead of a callback function, tools implement their own processing of inputs. Here is the distributed version of CountReads.
(For simplicity, some boilerplate code has been removed from the original in the GATK source code.)
public final class CountReadsSpark extends GATKSparkTool { @Override protected void runTool(final JavaSparkContext ctx) { final JavaRDD reads = getReads(); final long count = reads.count(); System.out.println(count); }} |
@Override
final JavaRDD reads = getReads();
System.out.println(count);
}
In this case, the reads are accessed as a Spark RDD (JavaRDD<GATKRead>
), then the Spark built-in function, count()
, is called to trigger a Spark job to count the number of entries in the RDD. The input is split into pieces (each of size 128MB, by default), and the Spark job runs a task for each split in parallel. A job that takes hours when run with the Walker version, runs in minutes on even a modest Spark cluster with a few nodes.
Counting reads is trivial, and embarrassingly parallelizable. Most tools in the GATK are more complex, and can’t be ported to Spark quite as easily. Let’s look at one of these in more detail: the Mark Duplicates algorithm.
Mark Duplicates in Spark
The sequencing process is an inherently noisy process, and it often happens that the same DNA fragment is sequenced multiple times, resulting in multiple reads that are duplicates. These duplicates need to be removed so that they don’t carry undue weight later on in the pipeline. The Mark Duplicates step finds and marks reads that should be considered the same.
How do you tell if two (or more) reads are duplicates? Reads have a few fields that make up the alignment information; if these fields are the same for a set of reads, then they are considered duplicates. Duplicate reads are not identical, so the algorithm scores each read in a set of duplicates based on other aspects of the read (such as quality measurements). Then, within each set of duplicates, the read with the highest score is left unchanged, and the others are marked as duplicates.
The Mark Duplicates algorithm boils down to sorting all reads by the alignment information fields. Implementing this using the Walker API is difficult, since the reads don’t all fit into memory. The implementation uses a custom on-disk sorting algorithm, but even this can only exploit a single machine.
With Spark, we have access to a highly-tuned distributed sort implementation, and this is hidden behind a nice Java API. Let’s look at the core part of the Mark Duplicates implementation. We start with the reads grouped by read group and name (files are typically already sorted like this, but if they are not, an initial sort is needed).
JavaPairRDD<String, Iterable> keyedReads = …; |
Next we extract the alignment information fields from each read into a string, and also construct PairedEnds
objects for the value. Reads are usually paired, with each member of a pair coming from either end of a fragment of DNA being sequenced. A PairedEnds
object is just a wrapper for a pair of reads.
JavaPairRDD<String, PairedEnds> keyPairs = keyedReads.flatMapToPair(keyedRead -> { … }); |
keyedReads.flatMapToPair(keyedRead -> { … });
We then perform a group by operation to bring all the reads (in PairedEnds
objects) that have the same alignment information together. The groups are therefore sets of duplicates.
JavaPairRDD<String, Iterable> keyedPairs = keyPairs.groupByKey(numReducers); |
keyPairs.groupByKey(numReducers);
Finally, for each group, the scoring and marking process can be carried out:
JavaRDD markedDups = markPairedEnds(keyedPairs); |
Composing Tools with Spark The Mark Duplicates tool will write the final RDD of reads to an output file, for further downstream processing by other GATK tools. As the output is an RDD, another option is to compose tools together within a single Spark job, so that intermediate steps don’t need to be materialized on the filesystem.
In the latest GATK4 alpha release, not all of the tools have been ported to Spark, so you can’t yet run the entire pipeline as a single Spark job. However, that will change in the future, with the goal being to run both an aligner (such as BWA-MEM) and the GATK variant discovery pipeline on a Spark cluster.
With many projects adopting Spark for large-scale computations, the number of possibilities for integrating tools in genomics pipelines is increasing. ADAM was the first project to embrace Spark as a platform for genomics, and the project also defines file formats for genomic data using Apache Parquet. GATK4 can already read and write ADAM Parquet formatted data, as an option. There’s also the interesting possibility of mixing and matching the tools in the pipeline. For example, you could use the variant caller from ADAM, or the one from the Hammer Lab (Jeff Hammerbacher’s lab at Mount Sinai Hospital in New York).
Looking further forward, another goal is to load the variant call datasets produced by the pipeline into Hadoop native storage engines such as Hive (in Parquet format) or Apache Kudu (incubating). This will enable users to take advantage of tools like Apache Impala (incubating), Ibis, or Spark for analysis, or even to build tools for scientists on top of these frameworks.
Take Part in Open Source Genomics
GATK4 is still young, but its reception has already been very encouraging. As an example of its potential, there’s a group at the Broad Institute working on copy number variations, which are complex variants in the genome where a section of DNA is repeated. With no prior experience of Spark, they were able to write a Spark tool that hadn’t been attempted on GATK3 due to its computational complexity, and get it running at what they estimated was one or two orders of magnitude faster than it would have been on GATK3.
The number of groups working on algorithmic optimizations and runtime tuning for the new Spark implementations is impressive: they include developers from Cloudera, Cray, Google, IBM, and Intel. With its large workloads, genomics is a good testbed for cutting edge hardware and software, so it’s perhaps not surprising that it’s already uncovered bugs (such as this bug in Kryo when running on Java 8).
Spark is fulfilling its promise as a common distributed computational fabric, that runs both in the cloud and on-premise. We at Cloudera hope that other developers will take part in projects like GATK that build on Spark. To find out more about GATK, visit the GitHub repository at https://github.com/broadinstitute/gatk.
*Tom White is a data scientist at Cloudera, and a member of the Apache Hadoop PMC. He is the author of the O’Reilly book, *Hadoop: The Definitive Guide.