Sunday, 27 November 2016

Playing with reactive streams with akka

Recently I'm learning Akka streaming for fun. Streams are basically Collections of data/events with unknown ending. So stream programming is useful if I want to process huge data set which will keep producing events over time. And, whenever I see any new event on the stream I will do the further processing on that event.
For example, the streams of online orders on any retail store. Customers will keep ordering, its a stream and once the store person sees the new order he looks for the items ordered and and picks them up.
Once he gathers all the items for one order, sends to other guy who will pack them into a box and tell other guy to put label on them. Which means there is stream of(orders ready to pack) and then there's stream of(orders ready to be labelled and shipped).
Finally all the labelled orders are put on the truck. So the truck or trailer is the final Sink here.
To visualize it,
online orders -> pick items -> pack order -> put label -> trailer -> | Done
Let's achieve it using akka streaming,
First, I will define the Order states,
case class Order(val itemName: String, val quantity: Int, val at: Date)
case class Picked(val itemName: String, val quantity: Int, val at: Date)
case class Packed(val itemName: String, val quantity: Int, val at: Date)
case class Labelled(val itemName: String, val quantity: Int, val at: Date)

Stream Source

Source is the Collection of elements which it keeps publishing/emitting.
scala> import akka.stream.scaladsl.{Source, Sink}

scala> val customerOrdersStream = Source(List(Order("DN black shirt - Triangle", 1, new Date()),
     |       Order("DN white shirt - classic", 2, new Date()),
     |       Order("DN red shirt - memento mori", 3, new Date())))
customerOrdersStream: akka.stream.scaladsl.Source[Order,akka.NotUsed] =
Source(SourceShape(StatefulMapConcat.out), CompositeModule [6611a9b6]
  Name: iterableSource
  Modules:
    (singleSource) GraphStage(SingleSource(List(Order(DN black shirt - Triangle,1,Sun Nov 27 13:05:38 PST 2016), Order(DN white shirt - classic,2,Sun Nov 27 13:05:38 PST 2016), Order(DN red shirt - memento mori,3,Sun Nov 27 13:05:38 PST 2016)))) [415e4363]
    (unnamed) [2cc00ee7] copy of GraphStage(StatefulMapConcat) [3b270282]
  Downstreams:
    single.out -> StatefulMapConcat.in
  Upstreams:
    StatefulMapConcat.in -> single.out
  MatValue: Atomic(singleSource[415e4363]))
Once Source is defined, I need to define the processing steps it has to go through, which are in this example pickingpackinglabelling. And these are known as Flows.

Stream Flows

    val picking : Flow[Order, Picked, NotUsed] = Flow[Order].map(order => {
      println(s"\npicking ${order.itemName}")
      Picked(order.itemName, order.quantity, new Date())
    })

    val packing : Flow[Picked, Packed, NotUsed] = Flow[Picked].map(picked => {
      println(s"packing ${picked.itemName}")
      Packed(picked.itemName, picked.quantity, new Date())
    })

    val labelling : Flow[Packed, Labelled, NotUsed] = Flow[Packed].map(packed => {
      println(s"labelling ${packed.itemName}")
      Shipped(packed.itemName, packed.quantity, new Date())
    })
Then, once all the steps are completed, I will send those customer orders to the trailer which will deliver the orders, so trailer is Sink here.

Stream Sink

Sink is a final destination of the items or events flowing through the stream.
    val trailer: Sink[Labelled, Future[Done]] = Sink.foreach(order => println(order.itemName + " is in the trailer, on the way to Customer."))
Finally once all the steps are defined individually, I need to assemble those into one pipeline, which is called RunnableFlow.
    val ordersFlow = customerOrdersStream
      .via(picking)
      .via(packing)
      .via(labelling)
      .to(trailer)

Materializer config

I need to define the ActorSystem and ActorMaterializer which materializes the Flows defined above into reactive Processors.
scala> import akka.actor.ActorSystem
import akka.actor.ActorSystem

scala> implicit val actorSystem = ActorSystem("orders-streaming")
actorSystem: akka.actor.ActorSystem = akka://orders-streaming

scala> import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializer

scala> implicit val materializer = ActorMaterializer()
materializer: akka.stream.ActorMaterializer = ActorMaterializerImpl(akka://orders-streaming,ActorMaterializerSettings(4,16,,<function1>,StreamSubscriptionTimeoutSettings(CancelTermination,5000 milliseconds),false,1000,1000,false,true),akka.dispatch.Dispatchers@77febdaf,Actor[akka://orders-streaming/user/StreamSupervisor-0#-1147523586],false,akka.stream.impl.SeqActorNameImpl@14deb7de)

Execute the defined stream flow

scala> ordersFlow.run()(materializer)
res1: akka.NotUsed = NotUsed

scala> 
picking DN black shirt - Triangle
packing DN black shirt - Triangle
labelling DN black shirt - Triangle
DN black shirt - Triangle is in the trailer, on the way to Customer.

picking DN white shirt - classic
packing DN white shirt - classic
labelling DN white shirt - classic
DN white shirt - classic is in the trailer, on the way to Customer.

picking DN red shirt - memento mori
packing DN red shirt - memento mori
labelling DN red shirt - memento mori
DN red shirt - memento mori is in the trailer, on the way to Customer.
As seen above, the orders are processed one at a time. In this case, say the retail has only 1 picker, 1 packer and 1 labeler.
What if the retailer has say 4 pickers, 4 packers and labelers, can I parallelize the stream, so that I can increase the throughput?

Refs

Saturday, 12 November 2016

playing with regex with java regex api


Part 1 - Pattern match


Greedy will consume as much as possible.
//lazy
scala> """[A-Za-z]+?""".r.findAllIn("ordernumbers are orderone and ordertwo").toList
res7: List[String] = List(o, r, d, e, r, n, u, m, b, e, r, s, a, r, e, o, r, d, e, r, o, n, e, a, n, d, o, r, d, e, r, t, w, o)

//greedy
scala> """[A-Za-z]+""".r.findAllIn("ordernumbers are orderone and ordertwo").toList
res8: List[String] = List(ordernumbers, are, orderone, and, ordertwo)

note

+-------------------+-----------------+------------------------------+
| Greedy quantifier | Lazy quantifier |        Description           |
+-------------------+-----------------+------------------------------+
| *                 | *?              | Star Quantifier: 0 or more   |
| +                 | +?              | Plus Quantifier: 1 or more   |


has pattern anything followed by timeMillis followed by anything
// simple event

scala> val event = "{\"timeMillis\": 123456}"
event: String = {"timeMillis": 123456}

scala> import java.util.regex.Pattern
import java.util.regex.Pattern

// has pattern {"timeMillis" : number}
scala> Pattern.matches("\\{\"timeMillis\": [0-9]*\\}", event)
res3: Boolean = true

// has pattern anything followed by timeMillis followed by anything
scala> Pattern.matches(".*timeMillis.*", event)
res4: Boolean = true
has pattern anything followed by "timeMillis" followed by anything
//complex event
scala> val event = "{\"timeMillis\": 123456, \"body\":\"some body\"}"
event: String = {"timeMillis": 123456, "body":"some body"}

// has pattern anything followed by "timeMillis" followed by anything
scala> Pattern.matches(".*\"timeMillis\".*", event)
res5: Boolean = true
has pattern {white space followed by "timeMillis" followed by anything
scala> val event = "{ \"timeMillis\": 123456, \"body\":\"some body\"}"
event: String = { "timeMillis": 123456, "body":"some body"}

scala> Pattern.matches("\\{\\s*\"timeMillis\"", event)
res8: Boolean = false

scala> Pattern.matches("\\{\\s*\"timeMillis\".*", event)
res9: Boolean = true


Part 2 - alphanumeric regex

scala> val regex = "^[a-zA-Z0-9]*$"
regex: String = ^[a-zA-Z0-9]*$

scala> import java.util.regex.Pattern
import java.util.regex.Pattern

scala> val alphanumeric = Pattern.compile(regex)
alphanumeric: java.util.regex.Pattern = ^[a-zA-Z0-9]*$

scala> alphanumeric.matcher("some order number").find()
res7: Boolean = false

scala> alphanumeric.matcher("someordernumber").find()
res8: Boolean = true

scala> alphanumeric.matcher("some funcky mother").find()
res9: Boolean = false
explanation
^ - asserts position at start of the string (like in VIm)
$ - asserts position at the end of the string (like in VIm)


Part 3

extract a chars array

  • extract a number between :
scala> val matcher = Pattern.compile(".*:([0-9]{12}):.*").matcher("arn:aws:kinesis:us-west-2:033814027302:stream/my_event_stream")
matcher: java.util.regex.Matcher = java.util.regex.Matcher[pattern=.*:([0-9]{12}):.* region=0,76 lastmatch=]

scala> matcher.find()
res16: Boolean = true

scala> matcher.group(1)
res17: String = 033814027302

explanation

.* starting with anything
() between
[0-9]{12} - 12 digits/ equivalent to \d{12}
.* - ending with anything
Also,
Pattern.matches("arn:aws:kinesis:us-west-2:[0-9]{12}:stream/stream", "arn:aws:kinesis:us-west-2:033814027302:stream/some_stream")
res18: Boolean = true

Monday, 7 November 2016

create compact log event each in separate line with log4j2 JSONLayout


I am recently working on shipping the application logs to elasticsearch database, so that mainly the http requests traversal through multiple services could be tracked down based on their requestId.

I am using flume agent to read the application log, read line by line modify few json elements and publish it to the elasticsearch database.

I first updated my application log to be in json format as flume easily recognizes json events. I am using log4j2 for that.

BUT, found out I can not read the multiple line json object from flume,

{
  "timeMillis" : 1474611652491,
  "thread" : "main",
  "level" : "DEBUG",
  "loggerName" : "suppliesLogger",
  "message" : "I'm Hunter Thomson and I'm alive.",
  "endOfBatch" : false,
  "loggerFqcn" : "org.apache.logging.log4j.spi.AbstractLogger",
  "threadId" : 1,
  "threadPriority" : 5
}

So, instead of writing stupid logic to read multiline json object on flume side, I updated my log4j2 config to write each log in one line in my application itself.

The config is as below,
{
  "configuration": {
    "name": "logggg",
    "packages" : "org.apache.logging",
    "appenders": {
      "RollingFile": {
        "name":"rollingStone",
        "fileName":"supply_chain_rolled.log",
        "filePattern":"%d{MM-dd-yy-HH-mm-ss}-%i.log.gz",
        "JSONLayout": {
          "complete" : false,
          "compact" : true,
          "eventEol" : true
        },
        "Policies": {
          "SizeBasedTriggeringPolicy": {
            "size":"10 MB"
          }
        },
        "DefaultRolloverStrategy": {
          "max":"10"
        }
      }
    },
    "loggers": {
      "root": {
        "level":"debug",
        "appender-ref": {
          "ref":"rollingStone"
        }
      }
    }
  }
}


I basically needed to make the json object to be compact with compact:true. But that will write all the events in one fking line.

So had to add EOL after each event with eventEol : true.

The application after compacting it with EOL is
{"timeMillis":1478588550167,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"I'm Hunter Thomson","endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}
{"timeMillis":1478588550569,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"artist=porcupine tree,address=UK","endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}
{"timeMillis":1478588550571,"thread":"main","level":"DEBUG","loggerName":"org.apache.logging.SupplyChainLogger","message":"Exception occured ","thrown":{"commonElementCount":0,"localizedMessage":"some exception","message":"some exception","name":"java.lang.Exception","extendedStackTrace":[{"class":"org.apache.logging.SupplyChainLogger","method":"main","file":"SupplyChainLogger.java","line":17,"exact":true,"location":"classes/","version":"?"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke0","file":"NativeMethodAccessorImpl.java","line":-2,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"sun.reflect.NativeMethodAccessorImpl","method":"invoke","file":"NativeMethodAccessorImpl.java","line":62,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"sun.reflect.DelegatingMethodAccessorImpl","method":"invoke","file":"DelegatingMethodAccessorImpl.java","line":43,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"java.lang.reflect.Method","method":"invoke","file":"Method.java","line":498,"exact":false,"location":"?","version":"1.8.0_101"},{"class":"com.intellij.rt.execution.application.AppMain","method":"main","file":"AppMain.java","line":147,"exact":true,"location":"idea_rt.jar","version":"?"}]},"endOfBatch":false,"loggerFqcn":"org.apache.logging.log4j.spi.AbstractLogger","threadId":1,"threadPriority":5}


Resource
-----------------------

http://logging.apache.org/log4j/2.0/log4j-core/apidocs/org/apache/logging/log4j/core/layout/JsonLayout.html