A New Library for Analyzing Time-Series Data with Apache Spark

Time-series analysis is becoming mainstream across multiple data-rich industries. The new spark-ts library helps analysts and data scientists focus on business questions, not on building their own algorithms.

Have you ever wanted to build models over measurements coming in every second from sensors across the world? Dig into intra-day trading prices of millions of financial instruments? Compare hourly view statistics across every page on Wikipedia? To do any of these things, you’d need to do a large sequence of measurements over time.

Large-scale time-series data shows up across a variety of domains. In this post, I’ll introduce Time Series for Spark (distributed as the spark-ts package), a library developed by Cloudera’s Data Science team (and in use by customers) that enables analysis of data sets comprising millions of time series, each with millions of measurements. The package runs atop Apache Spark, and exposes Scala and Python APIs.

First, I’ll cover what time-series data is, why you should care about it, and how to think about distributing it across machines in a cluster. If you’re impatient to actually use the spark-ts library, jump straight to the end where we demo the Scala API.

What is Time-Series Data?

As foreshadowed in the introduction, time-series data consists of sequences of measurements, each occurring at a point in time.

A variety of terms are used to describe time-series data, and many of them apply to conflicting or overlapping concepts. In the interest of clarity, in spark-ts , we stick to a particular vocabulary:

  • A time series is a sequence of floating-point values, each linked to a timestamp. In particular, we try as hard as possible to stick with “time series” as meaning a univariate time series, although in other contexts it sometimes refers to a series with multiple values at the same timestamp. In Scala, a time series is usually represented by a Breeze vector, and in Python, a 1-D NumPy array, and has a DateTimeIndex somewhere nearby to link its values to points in time.

  • An instant is the vector of values in a collection of time series corresponding to a single point in time. In the spark-ts library, each time series is typically labeled with a key that enables identifying it among a collection of time series.

  • An observation is a tuple of (timestamp, key, value), i.e. a single value in a time series or instant.

Not all data with timestamps is time-series data. For example, logs don’t fit directly into the time-series mold because they consist of discrete events, not scalar measurements taken at intervals. However, measurements of log-messages-per-hour would constitute a time series.

Use Cases

In many cases, we do the same things with time-series data that we do with other types of data:  We group and filter it, we compute basic summary statistics, we run regressions using each instant as a sample. Time is, of course, ubiquitous, and many data sets that we don’t specifically consider “time-series data” still qualify.

However, certain operations and patterns of analysis are exclusively for time-series data. For example, with a typical data set, we might impute missing values by looking at the mean across an entire column, but in time-series data, it often makes sense to use the data points that are adjacent in time. Alternatively, we might want to down-sample a data set with daily measurements to compute weekly sums.

From a statistical modeling perspective, the most interesting thing about time-series data is its dependency structure. As long as we’re not stuck inside the plot of Primer, at a given point in time, only simultaneous or prior values can have an effect on its value. Many time-series models rely on even more specific dependency assumptions. For example, autoregressive (AR) models express the value of a series at each time point as a linear function of values at immediately previous time points. In seasonal models, the value is affected by a periodic function spanning the series.

The spark-ts library supports these time series-specific munging patterns and offers a variety of statistical functionality for modeling time-series data.

Laying Out Time-Series Data in Memory

The layout of a time-series data set determines what questions are easy to ask about it, and how quickly the answers to those questions can be computed. In Spark, we work with RDDs and DataFrames, which are both collections of objects partitioned across a set of machines in the cluster. Thus when choosing the layout of a data set, the most important question is “What data do we store in each of these objects?” With time-series data, there are a few natural options for answering that question, which we’ll refer to as observations, instants, and time series.

Observations DataFrame

One layout option is to think of your data as a table with three columns: timestamp, key, and value. In the Spark world, you can represent this concept with a DataFrame. With this representation, if you have X timestamps and Y variables (keys), your table will have X * Y records. In Spark-TS parlance, this is an “observations” layout.

An advantage of this layout is that one can append data without needing the full vector of values at a particular timestamp—each row only includes a single scalar value. You also don’t need to worry about changing your schema when you add more keys.

However, the observations layout is not ideal for performing analysis. To ask most questions, you need to perform a group by on your data set, either by key or by timestamp, which is cumbersome as well as computationally expensive.

Instants DataFrame

The “instants” layout is ideal for much of traditional machine learning—for example, if you want to build a supervised learning model that predicts one variable based on contemporaneous values of the others.

TimeSeriesRDD

In the third layout, which is most central tospark-ts , each object in the RDD stores a full univariate series.

With this layout, the operations that tend to apply exclusively to time series are much more efficient. For example, if you want to generate a set of lagged time series from your original collection of time series, each lagged series can be computed by only looking at a single record in the input RDD. Similarly, with imputing missing values based on surrounding values, or fitting time-series models to each series, all the data needed is present in a single array.

The library contains utilities for converting datasets between these different representations, by the way.

Coding Against the Spark-TS API

The code, data, and surrounding Maven project for the following example are included in the spark-ts-examples repo.

The most straightforward way to access Spark-TS from Scala is to depend on it in a Maven project. Do that by including the following repo in the pom.xml:

            cloudera-repos      Cloudera Repos      https://repository.cloudera.com/artifactory/cloudera-repos/    

    

      Cloudera Repos

    </repository>

And including the following dependency in the pom.xml:

        com.cloudera.sparkts      sparkts      0.1.0

      com.cloudera.sparkts

      0.1.0

Alternatively, to access it in a spark-shell, download the JAR, and then launch the shell with:

  spark-shell \      –jars sparkts-0.1.0-jar-with-dependencies.jar \      –driver-class-path sparkts-0.1.0-jar-with-dependencies.jar

      –jars sparkts-0.1.0-jar-with-dependencies.jar \

The central abstraction of the spark-ts library is the TimeSeriesRDD, which is simply a collection of time series on which you can operate in a distributed fashion. The time series in the collection are aligned, meaning that position X in the array representing one of the time series refers to the same time point as position X in all the other series. This approach allows you to avoid storing timestamps for each series and instead store a single DateTimeIndex to which all the series vectors conform.

TimeSeriesRDD[K] extends RDD[(K, Vector[Double])], where K is the key type (usually a String), and the second element in the tuple is a Breeze vector representing the time series.

Example

Let’s use the library to play around with some ticker data. The data, comes in a tab-separated values file with columns year, month, day, ticker symbol, volume, price.

  2015    8       14      ADP     194911  82.992015    9       14      NKE     224435  111.782015    9       18      DO      678664  20.182015    8       7       TGT     147406  78.96

2015    9       14      NKE     224435  111.78

2015    8       7       TGT     147406  78.96

In the layouts introduced above, this is a table of observations. I omitted it here for the sake of brevity, but the loadTickerObservations function in this example loads this into a Spark DataFrame with three columns: “timestamp”, “symbol”, and “price”.  The omitted code is vanilla Spark DataFrames code, nothing specific tospark-ts .

  val tickerObs = loadTickerObservations(sqlContext, “../data/ticker.tsv”)

With a DataFrame of observations in hand, you can use the timeSeriesRDDFromObservationsfunction to convert it into a TimeSeriesRDD.

  // Create an daily DateTimeIndex over August and September 2015    val dtIndex = DateTimeIndex.uniform(      new DateTime(“2015-08-03”), new DateTime(“2015-09-22”), new BusinessDayFrequency(1))     // Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD    val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs,      ”timestamp”, “symbol”, “price”)

    val dtIndex = DateTimeIndex.uniform(

 

    val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs,

Because a TimeSeriesRDD is an RDD, it supports all the standard RDD functionality. First, let’s cache it in memory so that it won’t need to be loaded from disk each time it’s referenced:

 

Use the Spark’s count action to see how many series it contains. In this case, there is one series per symbol.

  println(tickerTsrdd.count())         104

 

Given a TimeSeriesRDD, it’s easy to perform operations on all the time series inside it in parallel. For example, you might want to impute missing values using linear interpolation:

  val filled = tickerTsrdd.fill(“linear”)

filled is another TimeSeriesRDD containing transformed values from the original TimeSeriesRDD. Then you can convert your series of prices into a series of rates of return:

  val returnRates = filled.returnRates()

returnRates is, again, a TimeSeriesRDD.

Finally, perhaps you’re curious about whether the rates of return exhibit serial correlation, which is when the values tend to be related to the values at directly previous time points. One way to figure that is to compute the Durbin-Watson statistic for each series.

  val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest(_))

dwStats is an RDD of (String, Double) tuples, where the String is the symbol and the Double is the statistic. Then you can look at the max (largest evidence for positive serial correlation) and the min (largest evidence for negative serial correlation). Because tuples are ordered lexicographically, you need to swap the values in the tuples so that the statistic comes first:

  println(dwStats.map(.swap).min)         (1.3607754934239373,FISV) println(dwStats.map(.swap).max)         (2.9699508244045685,MO)

 

Conclusion You should now have good insight into what it feels like to use the spark-ts library. If you’re interested in learning more, check out the full documentation.

Many thanks to Ahmed Mahran, Chris Dalzell, Jose Cambronero, Sean Owen, and Simon Ouellette for their contributions to the project!

*Sandy Ryza is a Data Scientist at Cloudera, and an Apache Spark committer. He is co-author of the O’Reilly Media book, *Advanced Analytics with Spark.