Recent advances in hardware and software have enabled the capture and the generation of different measurements of data in a wide range of fields. These measurements are generated continuously and in a very high fluctuating data rates. Examples include sensor networks, web logs, financial market, and computer network traffic. The storage, querying and processing of such data sets are highly computationally challenging tasks. Thus, there is an increasing interest on streaming architecture with some interesting outcome like Storm, Spark Streaming and Flink.
However, here we introduce two distinctive requirements that cannot be solved by the above cited solutions:
-
first, we require latency to be as low as feasible;
-
second, we need some mechanism to allow business people to add their own processing logic without having to deal with the architecture itself.
The two requirements are in some sense incompatible, as many useful abstractions come with a runtime cost. Our desiderata is solution that has: a more suitable for prototyping and interfacing, and then a low level part to use when things are more well-defined.
Let us consider an example for stock exchanges market. In this case the second requirement is a little more stringent than it seems at first. It is easy to provide a setup where end users are able to provide a library that works with domain objects - for instance a library of transformations on order books. The trouble arises when one wants to express more real-world concerns. For example, write a strategy to handle scenarios when market data feed is not arriving at the expected speed. That is, one should be able to abstract not only over the collection of data, but also over its timing characteristics.
After evaluating other solutions, we are building a prototype using Akka streams, and at the same time study in deep other low-latency messaging solutions outside the JVM.
Other streaming solutions
We were already familiar with streaming applications. The approaches that we have tried so far involved Storm, Spark Streaming or a combination of loosely coupled applications that communicate with each other using Kafka.
Neither of these approaches seemed feasible for this use case. Storm seems to have lost a lot of momentum, and does not seem to allow for stateful computations or custom error-recovering strategies. On the plus side, we like very much how it cleanly separates the definition of the operations from the topology on which these operations are deployed.
Spark Streaming is an entirely different beast. It is great to process lots of data, while keeping state at the same time, but it is based on a micro-batch approach. Events are collected and then emitted periodically in little batches every few seconds. While this is fine for near real-time systems, the latency introduced in nowhere near our target, which makes it unusable in this context.
Finally, there is always the possibility of writing separate components linked by a queue such as Kafka. Again, this is fine for near real-time, but having to go back and forth from Kafka introduces a visible (albeit small) latency that is not ok for the time-critical part of our system. All in all, Kafka is more suitable for loose communication among different applications than messaging between components of the same system.
Akka itself support working against streams, as well as other patterns of interaction. The actor model is extremely flexible for many kinds of multi-threaded applications, but the abstraction of streaming is not visible. Akka itself is great as a building block for the kind of application we want to design, but the streams API make concrete a lot of patterns and abstractions that we would need to reinvent anyway.
We should also mention Reactive Extensions and all solutions derived from their model, such as Reactor. These are great to achieve really low latency and provide a lot of operators to write declaratively transformations over streams. After trying them for a while, though, we have found them a little cumbersome when it comes to dividing workload across threads. Most Rx operators perform the work on the same thread where the stream they are working on belongs. This has the effect that a single pipeline of operations is usually single-threaded, thereby avoiding context switches and reducing latency. On the other hand, it is all too easy to try to branch a pipeline only to discover that both branches run on the same thread, blocking each other. Fixing this requires carefully organizing schedulers, and things become quite complex pretty soon.
Abstracting the stream
For our context, a stream will be an ordered collection of items, each
one arriving at a different point in time. This distinction makes it
working with streams very different from the usual collections. From one
point of view, one wants a collection-like abstraction that allows to
act on it with operators such as map
and filter
- many fast streaming
solutions stop at a lower level by providing only the messaging layer.
On the other hand, one wants to take into account the time characteristics
of the stream.
For instance, one operation that would be very difficult to write against a naive API would be a rate limiter. From the point of view of the collection, the items themselves are not changed, but the rate of arrival is altered somehow.
The difficulty lies in separating the Stream
abstraction from the runtime
that supports it.
A similar situation arises with the Future
in the Scala standard library.
A Future[A]
represents a value of type A
that will be available in a later
moment. By making such an interface available in the standard library, Scala
allows many different libraries to generate such values and still interoperate.
Futures can be manipulated with various operators, but in order to actually
start their execution one has to provide an ExecutionContext
. Different libraries,
such as Akka, Dispatch or
Play! all provide an execution context, so
that futures generated by any of them can be used transparently on each other’s
runtime.
For streams, the situation is similar, but instead of a single value at a point in time, one has many. In order to write composable libraries, one would want the abstract API to be available without having it tied to the runtime. This is useful even in the case where there is a single runtime supporting the abstraction (as it happens for Akka streams) because it allows to separate the logic of acting on the stream from the concerns of messaging.
For comparison, there is another abstraction of a stream of incoming data in the Scala world, and that is the Play! enumerators and iteratees. The Akka streams abstraction is a little more imperative in nature, but I found it much simpler to reason with and easier to learn.
Working with streams
Akka streams offers some standard operations to work on streams. The simplest to understand are similar to those acting on collections. An actual example taken from our prototype reads
1 |
|
Here publisher
implements the Publisher
interface from Reactive Streams
and is in fact our source of events (it is actually connected to Kafka using
Reactive Kafka).
The rest of the operations are not much different from working with collections.
Other than the usual suspects map
and filter
there are also fold
, take
,
drop
, groupBy
and so on.
Much more interesting are operations that need to take into account the rate of various producers. Most flow operations in Akka streams are back-pressured. This means that when connecting a producer to a consumer, the producer will only yield new events in response to a consumer request. Of course, the producer may not have new events available right now, so the pushing of the event may happen at a later stage. Still, it is performed as soon as possible, provided the consumer is ready to accept it.
When combining multiple producers with different rates, the default behaviour is
to backpressure the fast one. For instance, assume that we want to slow down the
averages
stream, by making it emit just one event per second. This can be done
by zipping it with a slow stream that only acts as metronome. The ZipWith
connector takes care of slowing down the fast averages
producer:
1 |
|
Here we use the Graph DSL
to combine the sources in a flexible way. The result of this is a source stream that has
the same elements as averages
but a slower production rate.
For cases where back-pressuring is not a viable strategy, one may want to drop
events from the fast producer, or cumulate them while waiting for the slow producer,
or viceversa interpolate the output of the slow producer to cope with the fast one.
This can be done with the conflate
and expand
operations.
The conflate
operator allows us to fold elements of a fast producer attached to a slow consumer.
For instance, dropping every event except for the last one would be just
1 |
|
while cumulating an exponential average of incoming numbers could be
1 |
|
On the other side, one can use expand
to cope with request from a consumer
that is faster than we are producing. For instance, plain repetition is just:
1 |
|
More complex cases can be handled by defining custom processing stages. This
can be done by extending an abstract class called GraphStage
.
It requires to define two things:
the abstract shape of the topology that we are defining, that is, the input and output ports; handlers to define what to do when we get an item pushed from an input port, or a request for an item coming from an output port.
These handlers are defined inside a GraphStageLogic
block, which contains the
logic which will be used by the materializer. In it, we implement the onPush
and onPull
methods. The former reacts to an event from a producer, while the
other one to a request from the consumer.
As an example, we used it to implement a rate adaptor: this takes messages as inputs and emits them according to a time schedule that is derived from the message itself. This comes quite handy while replaying events that have a date and time attached while keeping their rate approximately correct.
1 |
|
Here we use Thread.sleep()
, potentially blocking the running thread pool,
but it turns out that Akka streams allows to put particular stages on
their own dedicated thread pool, so it is not much of an issue. It would
also be easy to schedule the push of the element instead of pausing the thread.
Using this component is just
1 |
|
Of course this is a naive example how to deal with timers. As alternative we
can use the TimerGraphStageLogic
which offers the methods
-
scheduleOnce(key,delay)
, -
schedulePeriodically(key,period)
, or -
schedulePeriodicallyWithInitialDelay(key,delay,period)
.
Pieces of a puzzle
The real beauty of Akka streams arises when trying to combine multiple
pieces together. The main building block is the Graph
.
Flow elements are first class entities that have multiple inputs and
outputs, like this
1 |
|
The building block designed above represents a Graph
, having two
inputs, of types A
and B
, and two outputs, of types C
and D
.
Partial flow graphs can be combined together using the Graph DSL. For instance,
the above partial flow could be attached to another partial flow that has an
output of type A
, resulting in a bigger partial flow. Eventually, one can
build a runnable graph - that is, one without dangling ends - and run it
on a Materializer
. The Materializer
is the streaming equivalent of an
ExecutionContext
, and allows to take a flow graph - which in itself is just
an abstract description - and actually perform the computations.
Notable types of partial flow are Source[A]
, which is a graph with a single
output of type A
, and Sink[A]
, which is a graph with a single input of type
A
.
What sets apart Akka streams for other streaming libraries is that most approaches would start from a source and start building a pipeline of operations on it. In Akka streams it is as easy to describe a pipeline of operations with no attached ends. An example could be:
1 |
|
This snippet defines a flow graph with an input of type Double
and an output
of type Stats
that collects statistics in a 1-second time window. It can be
attached to a sink and a source by doing something like
1 |
|
or, of course, using the Graph DSL.
The benefit of this approach is that one can actually create a library of building blocks that are not limited to point-wise transformations, but can actually encompass the business logic of dealing with incoming flows of variable rates.
Not only that, but the flow itself is an abstract pipeline that can be analyzed, and this allows Akka Streams to perform loop fusion in order to run it on a single thread:
1 |
|
Here fastStats
will perform the same computations as stats, but whenever
possible, things will run on the same thread, so that latency will be
considerably lower.
Where do we get from here
If you are interested in knowing more, we have collected a few resources that can be useful:
Until next time!