Time-series analysis is becoming mainstream across multiple data-rich industries. The new spark-ts
library helps analysts and data scientists focus on business questions, not on building their own algorithms.
Have you ever wanted to build models over measurements coming in every second from sensors across the world? Dig into intra-day trading prices of millions of financial instruments? Compare hourly view statistics across every page on Wikipedia? To do any of these things, you’d need to do a large sequence of measurements over time.
Large-scale time-series data shows up across a variety of domains. In this post, I’ll introduce Time Series for Spark (distributed as the spark-ts
package), a library developed by Cloudera’s Data Science team (and in use by customers) that enables analysis of data sets comprising millions of time series, each with millions of measurements. The package runs atop Apache Spark, and exposes Scala and Python APIs.
First, I’ll cover what time-series data is, why you should care about it, and how to think about distributing it across machines in a cluster. If you’re impatient to actually use the spark-ts
library, jump straight to the end where we demo the Scala API.
What is Time-Series Data?
As foreshadowed in the introduction, time-series data consists of sequences of measurements, each occurring at a point in time.
A variety of terms are used to describe time-series data, and many of them apply to conflicting or overlapping concepts. In the interest of clarity, in spark-ts
, we stick to a particular vocabulary:
-
A time series is a sequence of floating-point values, each linked to a timestamp. In particular, we try as hard as possible to stick with “time series” as meaning a univariate time series, although in other contexts it sometimes refers to a series with multiple values at the same timestamp. In Scala, a time series is usually represented by a Breeze vector, and in Python, a 1-D NumPy array, and has a DateTimeIndex somewhere nearby to link its values to points in time.
-
An instant is the vector of values in a collection of time series corresponding to a single point in time. In the
spark-ts
library, each time series is typically labeled with a key that enables identifying it among a collection of time series. -
An observation is a tuple of (timestamp, key, value), i.e. a single value in a time series or instant.
Not all data with timestamps is time-series data. For example, logs don’t fit directly into the time-series mold because they consist of discrete events, not scalar measurements taken at intervals. However, measurements of log-messages-per-hour would constitute a time series.
Use Cases
In many cases, we do the same things with time-series data that we do with other types of data: We group and filter it, we compute basic summary statistics, we run regressions using each instant as a sample. Time is, of course, ubiquitous, and many data sets that we don’t specifically consider “time-series data” still qualify.
However, certain operations and patterns of analysis are exclusively for time-series data. For example, with a typical data set, we might impute missing values by looking at the mean across an entire column, but in time-series data, it often makes sense to use the data points that are adjacent in time. Alternatively, we might want to down-sample a data set with daily measurements to compute weekly sums.
From a statistical modeling perspective, the most interesting thing about time-series data is its dependency structure. As long as we’re not stuck inside the plot of Primer, at a given point in time, only simultaneous or prior values can have an effect on its value. Many time-series models rely on even more specific dependency assumptions. For example, autoregressive (AR) models express the value of a series at each time point as a linear function of values at immediately previous time points. In seasonal models, the value is affected by a periodic function spanning the series.
The spark-ts
library supports these time series-specific munging patterns and offers a variety of statistical functionality for modeling time-series data.
Laying Out Time-Series Data in Memory
The layout of a time-series data set determines what questions are easy to ask about it, and how quickly the answers to those questions can be computed. In Spark, we work with RDDs and DataFrames, which are both collections of objects partitioned across a set of machines in the cluster. Thus when choosing the layout of a data set, the most important question is “What data do we store in each of these objects?” With time-series data, there are a few natural options for answering that question, which we’ll refer to as observations, instants, and time series.
Observations DataFrame
One layout option is to think of your data as a table with three columns: timestamp, key, and value. In the Spark world, you can represent this concept with a DataFrame. With this representation, if you have X timestamps and Y variables (keys), your table will have X * Y records. In Spark-TS parlance, this is an “observations” layout.
An advantage of this layout is that one can append data without needing the full vector of values at a particular timestamp—each row only includes a single scalar value. You also don’t need to worry about changing your schema when you add more keys.
However, the observations layout is not ideal for performing analysis. To ask most questions, you need to perform a group by
on your data set, either by key or by timestamp, which is cumbersome as well as computationally expensive.
Instants DataFrame
The “instants” layout is ideal for much of traditional machine learning—for example, if you want to build a supervised learning model that predicts one variable based on contemporaneous values of the others.
TimeSeriesRDD
In the third layout, which is most central tospark-ts
, each object in the RDD stores a full univariate series.
With this layout, the operations that tend to apply exclusively to time series are much more efficient. For example, if you want to generate a set of lagged time series from your original collection of time series, each lagged series can be computed by only looking at a single record in the input RDD. Similarly, with imputing missing values based on surrounding values, or fitting time-series models to each series, all the data needed is present in a single array.
The library contains utilities for converting datasets between these different representations, by the way.
Coding Against the Spark-TS API
The code, data, and surrounding Maven project for the following example are included in the spark-ts-examples repo.
The most straightforward way to access Spark-TS from Scala is to depend on it in a Maven project. Do that by including the following repo in the pom.xml
:
</repository>
And including the following dependency in the pom.xml
:
Alternatively, to access it in a spark-shell, download the JAR, and then launch the shell with:
spark-shell \ –jars sparkts-0.1.0-jar-with-dependencies.jar \ –driver-class-path sparkts-0.1.0-jar-with-dependencies.jar |
–jars sparkts-0.1.0-jar-with-dependencies.jar \
The central abstraction of the spark-ts
library is the TimeSeriesRDD, which is simply a collection of time series on which you can operate in a distributed fashion. The time series in the collection are aligned, meaning that position X in the array representing one of the time series refers to the same time point as position X in all the other series. This approach allows you to avoid storing timestamps for each series and instead store a single DateTimeIndex
to which all the series vectors conform.
TimeSeriesRDD[K]
extends RDD[(K, Vector[Double])]
, where K is the key type (usually a String
), and the second element in the tuple is a Breeze vector representing the time series.
Example
Let’s use the library to play around with some ticker data. The data, comes in a tab-separated values file with columns year, month, day, ticker symbol, volume, price.
2015 8 14 ADP 194911 82.992015 9 14 NKE 224435 111.782015 9 18 DO 678664 20.182015 8 7 TGT 147406 78.96 |
2015 9 14 NKE 224435 111.78
2015 8 7 TGT 147406 78.96
In the layouts introduced above, this is a table of observations. I omitted it here for the sake of brevity, but the loadTickerObservations
function in this example loads this into a Spark DataFrame with three columns: “timestamp”, “symbol”, and “price”. The omitted code is vanilla Spark DataFrames code, nothing specific tospark-ts
.
val tickerObs = loadTickerObservations(sqlContext, “../data/ticker.tsv”) |
With a DataFrame of observations in hand, you can use the timeSeriesRDDFromObservations
function to convert it into a TimeSeriesRDD
.
// Create an daily DateTimeIndex over August and September 2015 val dtIndex = DateTimeIndex.uniform( new DateTime(“2015-08-03”), new DateTime(“2015-09-22”), new BusinessDayFrequency(1)) // Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs, ”timestamp”, “symbol”, “price”) |
val dtIndex = DateTimeIndex.uniform(
val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs,
Because a TimeSeriesRDD
is an RDD, it supports all the standard RDD functionality. First, let’s cache it in memory so that it won’t need to be loaded from disk each time it’s referenced:
Use the Spark’s count
action to see how many series it contains. In this case, there is one series per symbol.
println(tickerTsrdd.count()) 104 |
Given a TimeSeriesRDD
, it’s easy to perform operations on all the time series inside it in parallel. For example, you might want to impute missing values using linear interpolation:
val filled = tickerTsrdd.fill(“linear”) |
filled is another TimeSeriesRDD
containing transformed values from the original TimeSeriesRDD
. Then you can convert your series of prices into a series of rates of return:
val returnRates = filled.returnRates() |
returnRates
is, again, a TimeSeriesRDD
.
Finally, perhaps you’re curious about whether the rates of return exhibit serial correlation, which is when the values tend to be related to the values at directly previous time points. One way to figure that is to compute the Durbin-Watson statistic for each series.
val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest(_)) |
dwStats
is an RDD of (String, Double)
tuples, where the String
is the symbol and the Double
is the statistic. Then you can look at the max (largest evidence for positive serial correlation) and the min (largest evidence for negative serial correlation). Because tuples are ordered lexicographically, you need to swap the values in the tuples so that the statistic comes first:
println(dwStats.map(.swap).min) (1.3607754934239373,FISV) println(dwStats.map(.swap).max) (2.9699508244045685,MO) |
Conclusion
You should now have good insight into what it feels like to use the spark-ts
library. If you’re interested in learning more, check out the full documentation.
Many thanks to Ahmed Mahran, Chris Dalzell, Jose Cambronero, Sean Owen, and Simon Ouellette for their contributions to the project!
*Sandy Ryza is a Data Scientist at Cloudera, and an Apache Spark committer. He is co-author of the O’Reilly Media book, *Advanced Analytics with Spark.