Recently I'm learning Akka streaming for fun. Streams are basically Collections of data/events with unknown ending. So stream programming is useful if I want to process huge data set which will keep producing events over time. And, whenever I see any new event on the stream I will do the further processing on that event.
For example, the streams of
online orders
on any retail store. Customers will keep ordering, its a stream and once the store person sees the new order he looks for the items ordered and and picks them up.
Once he gathers all the items for one order, sends to other guy who will pack them into a box and tell other guy to put label on them. Which means there is
stream of(orders ready to pack)
and then there's stream of(orders ready to be labelled and shipped)
.
Finally all the
labelled orders
are put on the truck. So the truck or trailer
is the final Sink
here.
To visualize it,
online orders -> pick items -> pack order -> put label -> trailer -> | Done
Let's achieve it using akka streaming,
First, I will define the Order states,
case class Order(val itemName: String, val quantity: Int, val at: Date)
case class Picked(val itemName: String, val quantity: Int, val at: Date)
case class Packed(val itemName: String, val quantity: Int, val at: Date)
case class Labelled(val itemName: String, val quantity: Int, val at: Date)
Stream Source
Source
is the Collection
of elements which it keeps publishing/emitting.scala> import akka.stream.scaladsl.{Source, Sink}
scala> val customerOrdersStream = Source(List(Order("DN black shirt - Triangle", 1, new Date()),
| Order("DN white shirt - classic", 2, new Date()),
| Order("DN red shirt - memento mori", 3, new Date())))
customerOrdersStream: akka.stream.scaladsl.Source[Order,akka.NotUsed] =
Source(SourceShape(StatefulMapConcat.out), CompositeModule [6611a9b6]
Name: iterableSource
Modules:
(singleSource) GraphStage(SingleSource(List(Order(DN black shirt - Triangle,1,Sun Nov 27 13:05:38 PST 2016), Order(DN white shirt - classic,2,Sun Nov 27 13:05:38 PST 2016), Order(DN red shirt - memento mori,3,Sun Nov 27 13:05:38 PST 2016)))) [415e4363]
(unnamed) [2cc00ee7] copy of GraphStage(StatefulMapConcat) [3b270282]
Downstreams:
single.out -> StatefulMapConcat.in
Upstreams:
StatefulMapConcat.in -> single.out
MatValue: Atomic(singleSource[415e4363]))
Once
Source
is defined, I need to define the processing steps it has to go through, which are in this example picking
, packing
, labelling
. And these are known as Flow
s.Stream Flows
val picking : Flow[Order, Picked, NotUsed] = Flow[Order].map(order => {
println(s"\npicking ${order.itemName}")
Picked(order.itemName, order.quantity, new Date())
})
val packing : Flow[Picked, Packed, NotUsed] = Flow[Picked].map(picked => {
println(s"packing ${picked.itemName}")
Packed(picked.itemName, picked.quantity, new Date())
})
val labelling : Flow[Packed, Labelled, NotUsed] = Flow[Packed].map(packed => {
println(s"labelling ${packed.itemName}")
Shipped(packed.itemName, packed.quantity, new Date())
})
Then, once all the steps are completed, I will send those customer orders to the
trailer
which will deliver the orders, so trailer is Sink
here.Stream Sink
Sink
is a final destination of the items or events flowing through the stream. val trailer: Sink[Labelled, Future[Done]] = Sink.foreach(order => println(order.itemName + " is in the trailer, on the way to Customer."))
Finally once all the steps are defined individually, I need to assemble those into one pipeline, which is called
RunnableFlow
. val ordersFlow = customerOrdersStream
.via(picking)
.via(packing)
.via(labelling)
.to(trailer)
Materializer config
I need to define the
ActorSystem
and ActorMaterializer
which materializes the Flow
s defined above into reactive Processors.scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem
scala> implicit val actorSystem = ActorSystem("orders-streaming")
actorSystem: akka.actor.ActorSystem = akka://orders-streaming
scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer
scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://orders-streaming,ActorMaterializerSettings(4,16,,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true),akka.dispatch.Dispatchers@77febdaf,Actor[akka://orders-streaming/user/StreamSupervisor-0#-1147523586],false,akka.stream.impl.SeqActorNameImpl@14deb7de)
Execute the defined stream flow
scala> ordersFlow.run()(materializer)
res1: akka.NotUsed = NotUsed
scala>
picking DN black shirt - Triangle
packing DN black shirt - Triangle
labelling DN black shirt - Triangle
DN black shirt - Triangle is in the trailer, on the way to Customer.
picking DN white shirt - classic
packing DN white shirt - classic
labelling DN white shirt - classic
DN white shirt - classic is in the trailer, on the way to Customer.
picking DN red shirt - memento mori
packing DN red shirt - memento mori
labelling DN red shirt - memento mori
DN red shirt - memento mori is in the trailer, on the way to Customer.
As seen above, the orders are processed one at a time. In this case, say the retail has only 1 picker, 1 packer and 1 labeler.
What if the retailer has say 4 pickers, 4 packers and labelers, can I parallelize the stream, so that I can increase the throughput?