Reactive Stream vs Akka Stream
The first time I approached AkkaStream I found it hard to handle in contrast to Reactive Extensions and other Reactive Stream implementations. On one side I have a simple and ready to use DSL with simple operators like map, filter, ...
; on the other side I have several concepts like Flow, GraphDSL, and GraphStage.
So it was clear to me to approach with Reactive Extensions, but I was wrong. As Discussed before A first look at Akka streams Reactive Extensions are really good at defining flows, but become hard to manage when you are required to deal with streams coming at different rate or when you have to broadcast or merge streams. Fixing this requires carefully organizing schedulers, and things become quite complex pretty soon.
1 |
|
Above you can see an example of broadcasting an incoming stream to multiple outputs. Here you need to explicitly setup a Thread to run the computation.
On the contrary, Akka Stream is built with the decision to offer APIs that are minimal and consistent—as opposed to easy or intuitive. There is no magic, all the API features are explicit. You have operators to define Source, Sink, Flow, Graph and Operators. You have the operators to handle back-pressure, buffering, transformations, failure recovery, etc. But on my opinion the features that make Akka Stream implementation interesting are:
- the idea to use Akka Actors as backend for stream execution. Each transformations in Akka Stream is materialized into an Actor, which is scheduled on a thread pool.
- the separation between stream materialization and definition. What you define in Akka Stream is a blueprint, that is a set of flow, graph and topologies. These defined partial graph can be reused and composed to create complex graphs.
Note about Latency: the solution to define an actor for each transformation derives in high latency since Akka actors would not be scheduled every time on the same thread of the thread pool. However, with the Fusion Operator it is possible to execute multiple processing steps of the stream graph within the same Actor. By default Akka Stream configuration file exposes an auto-fusing property to reduce actors creation during the materialisation step.
Using Akka Stream we can focus on building libraries of reusable pieces, allowing full compositionality without adding constraint on the actual stream execution. Constraint on the execution can be optionally imposed when composing the graphs or by extending the materializer. For example Gearpump.io supports Akka Stream by providing the GearpumpMaterializer.
Akka Stream concepts
The element in Akka Stream that allows you to define a stream are:
-
Source: something with exactly one output stream
-
Sink: something with exactly one input stream
-
Flow: something with exactly one input and one output stream
-
BidiFlow: something with exactly two input streams and two output streams that conceptually behave like two Flows of opposite direction
-
Graph: a packaged stream processing topology that exposes a certain set of input and output ports, characterized by an object of type Shape.
Source
A Source is a set of stream processing steps that has one open output. It can comprise any number of internal sources and transformations that are wired together, or it can be an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into a Reactive Streams Publisher.
All the code used below can be found at akka_stream_tutorial.
As basis a source can be create from an Iterable.
1 |
|
Now that we have our first Source to run our first Akka Stream example let us add a Sink and run the computation.
1 |
|
You can see here that the stream definition is separated from its running. If we extend the same example with types we get:
1 |
|
So we can see that Source is of type Source[Int, NotUsed]
and computation is of type RunnableGraph[NotUsed]
.
-
The
NotUsed
can be changed with other Types like Int, Double and so on. It can be used in cases of finite streams where we want to get a result from the stream execution. -
The
RunnableGraph
is the base class for each StreamComputation. -
The
firstSource.to(Sink.foreach(println))
define that the source is sentto
a Sink. TheSink.foreach(println)
is a Sink that print each element of the stream. -
The second parameter for the method
computation.run()
is the materializer that take cares of trasforming the computation in Actors.
Let us postpone in the following how to get results from stream execution and concentrate on Sources. Apart an Iterable a source can be started from: Iterator, Future, Graph, Publisher, Promise, single element, Tick and from Resource.
At first let’s define a simple trait to mix with out objects
1 |
|
Iterator
1 |
|
Future
1 |
|
object SourceFromFuture2 extends AkkaStreamApp { Source.fromFuture(Future(1 to 10)) .mapConcat(identity) .runWith(Sink.foreach(println)) }
1 |
|
object SourceFromSingle extends AkkaStreamApp { Source.single(1) .runWith(Sink.foreach(println)) }
1 |
|
object SourceFromTick extends AkkaStreamApp { Source.tick(0 second, 1 second, 1) .runForeach(println) }
1 |
|
object FromCloseableResource extends AkkaStreamApp {
val in = this.getClass.getResourceAsStream(“example_resource.txt”)
Source.unfoldResourceString, BufferedReader .runForeach(println) }
1 |
|
Source.single(1) .runWith(Sink.foreach(println))
1 |
|
val flow = Source(1 to 1000) .map(_ + 1) .filter(_ % 2 == 0) .toMat(Sink.fold(0)(_ + _))(Keep.right)
1 |
|
def myFlow(): Flow[Int, Int, NotUsed] = Flow[Int] .map(_ + 1) .filter(_ % 2 == 0)
val flow = Source(1 to 1000) .via(myFlow) .toMat(Sink.fold(0)(_ + _))(Keep.right)
val result = flow.run()
```
In the above example we transformed part of the stream processing into the method myFlow
, that we can combine with a source using the via
method.
If you pay attention to the signature of myFlow()
you can see that the class Flow transform an Int to an Int and does not use materialisation parameter (Flow[Int,Int,Mat]
).
Source and Flow class and object expose a rich API to work with streams.
It allows you to process your stream one element at time, doing transformation, filtering. But it offers to you operators to deal with: timers, schedulers, merge of stream with different rates, apply async operations without blocking, monitor a stream, batch your stream in windows (and sliding windows), throttle the stream to limit its speed. Finally, it gives to you also basic operators to join and merge streams, but in the following we will that using the GraphDSL
api we can deal with multiple streams in a more elegant way.
Conclusion
By analysing how to create a Source we just discovered the surface of Akka Stream. In the next post we will dig deeper in the complex part, which is dealing with graph and streaming graph composed by slower and faster nodes.