Akka Stream

Recent advances in hardware and software have enabled the capture and the generation of different measurements of data in a wide range of fields. These measurements are generated continuously and in a very high fluctuating data rates. Examples include sensor networks, web logs, financial market, and computer network traffic. The storage, querying and processing of such data sets are highly computationally challenging tasks. Thus, there is an increasing interest on streaming architecture with some interesting outcome like Storm, Spark Streaming and Flink.

However, here we introduce two distinctive requirements that cannot be solved by the above cited solutions:

  • first, we require latency to be as low as feasible;

  • second, we need some mechanism to allow business people to add their own processing logic without having to deal with the architecture itself.

The two requirements are in some sense incompatible, as many useful abstractions come with a runtime cost. Our desiderata is solution that has: a more suitable for prototyping and interfacing, and then a low level part to use when things are more well-defined.

Let us consider an example for stock exchanges market. In this case the second requirement is a little more stringent than it seems at first. It is easy to provide a setup where end users are able to provide a library that works with domain objects - for instance a library of transformations on order books. The trouble arises when one wants to express more real-world concerns. For example, write a strategy to handle scenarios when market data feed is not arriving at the expected speed. That is, one should be able to abstract not only over the collection of data, but also over its timing characteristics.

After evaluating other solutions, we are building a prototype using Akka streams, and at the same time study in deep other low-latency messaging solutions outside the JVM.

Other streaming solutions

We were already familiar with streaming applications. The approaches that we have tried so far involved Storm, Spark Streaming or a combination of loosely coupled applications that communicate with each other using Kafka.

Neither of these approaches seemed feasible for this use case. Storm seems to have lost a lot of momentum, and does not seem to allow for stateful computations or custom error-recovering strategies. On the plus side, we like very much how it cleanly separates the definition of the operations from the topology on which these operations are deployed.

Spark Streaming is an entirely different beast. It is great to process lots of data, while keeping state at the same time, but it is based on a micro-batch approach. Events are collected and then emitted periodically in little batches every few seconds. While this is fine for near real-time systems, the latency introduced in nowhere near our target, which makes it unusable in this context.

Finally, there is always the possibility of writing separate components linked by a queue such as Kafka. Again, this is fine for near real-time, but having to go back and forth from Kafka introduces a visible (albeit small) latency that is not ok for the time-critical part of our system. All in all, Kafka is more suitable for loose communication among different applications than messaging between components of the same system.

Akka itself support working against streams, as well as other patterns of interaction. The actor model is extremely flexible for many kinds of multi-threaded applications, but the abstraction of streaming is not visible. Akka itself is great as a building block for the kind of application we want to design, but the streams API make concrete a lot of patterns and abstractions that we would need to reinvent anyway.

We should also mention Reactive Extensions and all solutions derived from their model, such as Reactor. These are great to achieve really low latency and provide a lot of operators to write declaratively transformations over streams. After trying them for a while, though, we have found them a little cumbersome when it comes to dividing workload across threads. Most Rx operators perform the work on the same thread where the stream they are working on belongs. This has the effect that a single pipeline of operations is usually single-threaded, thereby avoiding context switches and reducing latency. On the other hand, it is all too easy to try to branch a pipeline only to discover that both branches run on the same thread, blocking each other. Fixing this requires carefully organizing schedulers, and things become quite complex pretty soon.

Abstracting the stream

For our context, a stream will be an ordered collection of items, each one arriving at a different point in time. This distinction makes it working with streams very different from the usual collections. From one point of view, one wants a collection-like abstraction that allows to act on it with operators such as map and filter - many fast streaming solutions stop at a lower level by providing only the messaging layer. On the other hand, one wants to take into account the time characteristics of the stream.

For instance, one operation that would be very difficult to write against a naive API would be a rate limiter. From the point of view of the collection, the items themselves are not changed, but the rate of arrival is altered somehow.

The difficulty lies in separating the Stream abstraction from the runtime that supports it.

A similar situation arises with the Future in the Scala standard library. A Future[A] represents a value of type A that will be available in a later moment. By making such an interface available in the standard library, Scala allows many different libraries to generate such values and still interoperate.

Futures can be manipulated with various operators, but in order to actually start their execution one has to provide an ExecutionContext. Different libraries, such as Akka, Dispatch or Play! all provide an execution context, so that futures generated by any of them can be used transparently on each other’s runtime.

For streams, the situation is similar, but instead of a single value at a point in time, one has many. In order to write composable libraries, one would want the abstract API to be available without having it tied to the runtime. This is useful even in the case where there is a single runtime supporting the abstraction (as it happens for Akka streams) because it allows to separate the logic of acting on the stream from the concerns of messaging.

For comparison, there is another abstraction of a stream of incoming data in the Scala world, and that is the Play! enumerators and iteratees. The Akka streams abstraction is a little more imperative in nature, but I found it much simpler to reason with and easier to learn.

Working with streams

Akka streams offers some standard operations to work on streams. The simplest to understand are similar to those acting on collections. An actual example taken from our prototype reads

1
2
3
4
5
6
val bookStream = Source.fromPublisher(publisher).map(_.message)
val averages = bookStream
 .filter(_.symbol == symbol)
 .filter(_.ecn == ecn)
 .map(_.averages)

Here publisher implements the Publisher interface from Reactive Streams and is in fact our source of events (it is actually connected to Kafka using Reactive Kafka).

The rest of the operations are not much different from working with collections. Other than the usual suspects map and filter there are also fold, take, drop, groupBy and so on.

Much more interesting are operations that need to take into account the rate of various producers. Most flow operations in Akka streams are back-pressured. This means that when connecting a producer to a consumer, the producer will only yield new events in response to a consumer request. Of course, the producer may not have new events available right now, so the pushing of the event may happen at a later stage. Still, it is performed as soon as possible, provided the consumer is ready to accept it.

When combining multiple producers with different rates, the default behaviour is to backpressure the fast one. For instance, assume that we want to slow down the averages stream, by making it emit just one event per second. This can be done by zipping it with a slow stream that only acts as metronome. The ZipWith connector takes care of slowing down the fast averages producer:

1
2
3
4
5
6
7
8
9
10
11
val ticks = Source.tick(initialDelay = 1 second, interval = 1 second, ())
val flow = Source.fromGraph(GraphDSL.create() { implicit builder =>
 import GraphDSL.Implicits._
 val zip = builder.add(ZipWith((av: Averages, tick: Unit) => av))

 averages ~> zip.in0
 ticks ~> zip.in1

 SourceShape(zip.out)
}

Here we use the Graph DSL to combine the sources in a flexible way. The result of this is a source stream that has the same elements as averages but a slower production rate.

For cases where back-pressuring is not a viable strategy, one may want to drop events from the fast producer, or cumulate them while waiting for the slow producer, or viceversa interpolate the output of the slow producer to cope with the fast one. This can be done with the conflate and expand operations.

The conflate operator allows us to fold elements of a fast producer attached to a slow consumer. For instance, dropping every event except for the last one would be just

1
2
3
val skipped = fastProducer
 .conflate(identity)((oldMsg, newMsg) => newMsg)

while cumulating an exponential average of incoming numbers could be

1
2
3
val expAverage = fastProducer
 .conflate(identity)((a, b) => (a + b) / 2)

On the other side, one can use expand to cope with request from a consumer that is faster than we are producing. For instance, plain repetition is just:

1
2
val repeatedStream = slowProducer.expand(identity)(s => (s, s))

More complex cases can be handled by defining custom processing stages. This can be done by extending an abstract class called GraphStage.

It requires to define two things:

the abstract shape of the topology that we are defining, that is, the input and output ports; handlers to define what to do when we get an item pushed from an input port, or a request for an item coming from an output port.

These handlers are defined inside a GraphStageLogic block, which contains the logic which will be used by the materializer. In it, we implement the onPush and onPull methods. The former reacts to an event from a producer, while the other one to a request from the consumer.

As an example, we used it to implement a rate adaptor: this takes messages as inputs and emits them according to a time schedule that is derived from the message itself. This comes quite handy while replaying events that have a date and time attached while keeping their rate approximately correct.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class RateAdaptor[A](time: A => DateTime) extends GraphStage[FlowShape[A, A]] {

 val in = Inlet[A]("RateAdaptor.in")
 val out = Outlet[A]("RateAdaptor.out")

 override def shape: FlowShape[A, A] = FlowShape.of(in, out)

 var firstEventTime = 0L
 var firstActualTime = 0L

 override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
 new GraphStageLogic(shape) {

 setHandler(in, new InHandler {
 override def onPush(): Unit = {
 val elem = grab(in)
 val actualTime = System.currentTimeMillis
 val eventTime = time(elem).getMillis
 val actualDelay = actualTime - firstActualTime
 val eventDelay = eventTime - firstEventTime

 if (firstActualTime != 0L) {
 if (actualDelay < eventDelay) Thread.sleep(eventDelay - actualDelay)
 } else {
 firstActualTime = actualTime
 firstEventTime = eventTime
 push(out, elem)
 }
 })

 setHandler(out, new OutHandler {
 override def onPull(): Unit = {
 pull(in)
 }
 })

 }

}

Here we use Thread.sleep(), potentially blocking the running thread pool, but it turns out that Akka streams allows to put particular stages on their own dedicated thread pool, so it is not much of an issue. It would also be easy to schedule the push of the element instead of pausing the thread. Using this component is just

1
2
val orders = bookStream.transform(() => new RateAdaptor[Book](_.ecnTime))

Of course this is a naive example how to deal with timers. As alternative we can use the TimerGraphStageLogic which offers the methods

  • scheduleOnce(key,delay),

  • schedulePeriodically(key,period), or

  • schedulePeriodicallyWithInitialDelay(key,delay,period).

Pieces of a puzzle

The real beauty of Akka streams arises when trying to combine multiple pieces together. The main building block is the Graph. Flow elements are first class entities that have multiple inputs and outputs, like this

1
2
3
4
5
6
7
8
Flow:
 ________
 A -->__ __--> C
 | |
 | |
 __| |__
 B -->_________-->D

The building block designed above represents a Graph, having two inputs, of types A and B, and two outputs, of types C and D.

Partial flow graphs can be combined together using the Graph DSL. For instance, the above partial flow could be attached to another partial flow that has an output of type A, resulting in a bigger partial flow. Eventually, one can build a runnable graph - that is, one without dangling ends - and run it on a Materializer. The Materializer is the streaming equivalent of an ExecutionContext, and allows to take a flow graph - which in itself is just an abstract description - and actually perform the computations.

Notable types of partial flow are Source[A], which is a graph with a single output of type A, and Sink[A], which is a graph with a single input of type A.

What sets apart Akka streams for other streaming libraries is that most approaches would start from a source and start building a pipeline of operations on it. In Akka streams it is as easy to describe a pipeline of operations with no attached ends. An example could be:

1
2
3
4
5
6
7
8
9
10
case class Stats(σ: Double, μ: Double, size: Int)
val stats = Flow[Double]
 .groupedWithin(1000, 1 second)
 .map { s =>
 val μ = s.sum / s.size
 val se = s.map(x => pow(x - μ, 2))
 val σ = sqrt(se.sum / se.size)
 Stats(σ, μ, s.size)
 }

This snippet defines a flow graph with an input of type Double and an output of type Stats that collects statistics in a 1-second time window. It can be attached to a sink and a source by doing something like

1
2
source.via(stats).to(sink)

or, of course, using the Graph DSL.

The benefit of this approach is that one can actually create a library of building blocks that are not limited to point-wise transformations, but can actually encompass the business logic of dealing with incoming flows of variable rates.

Not only that, but the flow itself is an abstract pipeline that can be analyzed, and this allows Akka Streams to perform loop fusion in order to run it on a single thread:

1
2
3
4
import akka.stream.Fusing

val fastStats = Fusing.aggressive(stats)

Here fastStats will perform the same computations as stats, but whenever possible, things will run on the same thread, so that latency will be considerably lower.

Where do we get from here

If you are interested in knowing more, we have collected a few resources that can be useful:

Until next time!