Sunday, 27 December 2015

Scala Future with play framework


Async controller
----------------------------
package beard.controllers

import play.api.mvc._
import play.api.libs.json._
import play.api.libs.functional.syntax._
import play.libs.Json._
import reactivemongo.bson._
import service.BeardService
import play.api.libs.concurrent.Execution.Implicits.defaultContext

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.util.{Failure, Success, Random}

class AsyncBeardController extends Controller {

 def sayAsyncBeard = Action.async { request =>
    val futureResult = Future {
      intensiveComputation()
    }
    futureResult.map(result =>
      Ok(result)
    )
  }


def intensiveComputation(): JsObject = {
      Thread.sleep(Random.nextInt(5000))
        Json.obj("value" -> "beard")
  }
}

Test
-------
$ curl -XGET http://localhost:9000/sayAsyncBeard
{"value":"beard"}

You will get the response after ~5seconds.

Wednesday, 25 November 2015

CPU cores in OSX

Seems OSX sucks in command line support as Linux does. However, found few commands to get hw information in OSX.

sysctl -n hw.ncpu
8

sysctl -n hw.physicalcpu
4

sysctl -n hw.logicalcpu
8


Ref : https://coolaj86.com/articles/get-a-count-of-cpu-cores-on-linux-and-os-x/

Sunday, 20 September 2015

Kafka Events Streaming - Spark Consumer Example

* Apache Kafka supports a wide range of use cases as a general-purpose messaging system for scenarios where high throughput, reliable delivery, and horizontal scalability are important.
Use cases include:
* Stream Processing
* Website Activity Tracking
* Metrics Collection and Monitoring
* Log Aggs

* Apache Storm and Apache Spark both work very well in combination with Kafka.




Here's an example how Kafka can be used with apache spark for consuming events stream,

DEVELOPMENT
STEP 1 : ADD lib dependency to decoupled-invocation/build.sbt 


name := "decoupled-invocation"                                                                                                                        

version := "1.0"                                                                                    

scalaVersion := "2.10.4"                                                                            

val sparkVersion = "1.2.0"                                                                          

libraryDependencies ++= Seq(                                                                        
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",                              
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion                                      
)   


STEP 2 : Create kafka stream in scala file decoupled-invocationsrc/main/scala/EventsConsumerApp.scala


object EventsConsumerApp {                                                                          

  def persist() = {                                                                                 

  }                                                                                                 

  def main(args : Array[String]): Unit = {                                                          
   val sparkConf = new SparkConf().setMaster("spark://prayagupd:7077").setAppName("EventsConsumerApp")
   val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(10))                         

   val kafkaConf = Map("metadata.broker.list" -> "localhost:9092",                                  
                       "zookeeper.connect" -> "localhost:2181",                                     
                       "group.id" -> "events-topic-consumer-group",                                 
                       "zookeeper.connection.timeout.ms" -> "1000")                                 

    //http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams
   val kafkaDiscretizedStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](sparkStreamingContext,
                                             kafkaConf,                                             
                                             Map("events-topic" -> 1), StorageLevel.MEMORY_ONLY_SER)
   //persist()                                                                                      
   kafkaDiscretizedStream.print()                                                                   
   sparkStreamingContext.start()                                                                    
   sparkStreamingContext.awaitTermination()                                                         
 }                                                                                                  
}   


STEP 3 : build application
sbt assembly ## doing sbt package wont find kafka jar while submitting job to spark


DEPLOYMENT
STEP 4 : start kafka broker with default config
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

#terminal 2
bin/kafka-server-start.sh config/server.properties

STEP 5 : Create kafka topic events-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic events-topic
Created topic "events-topic".

verify topic is created
bin/kafka-topics.sh --list --zookeeper localhost:2181
events-topic
STEP 6 : Produce stream using kafka to topic events-topic


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic events-topic
{'message' : 'user is logging in'}
{'message' : 'User logged in'}


STEP 7 : submit spark job that will consume events produced by above kafka producer


/usr/local/spark-1.2.0/bin/spark-submit --class EventsConsumerApp --master spark://prayagupd:7077 target/scala-2.10/decoupled-invocation-assembly-1.0.jar

(null,{'message' : 'user is logging in'})
(null,{'message' : 'User logged in'})

15/09/20 17:04:12 INFO scheduler.JobScheduler: Finished job streaming job 1442786650000 ms.0 from job set of time 1442786650000 ms
15/09/20 17:04:12 INFO scheduler.JobScheduler: Total delay: 2.294 s for time 1442786650000 ms (execution: 2.289 s)
15/09/20 17:04:12 INFO rdd.BlockRDD: Removing RDD 5 from persistence list
15/09/20 17:04:12 INFO storage.BlockManager: Removing RDD 5
15/09/20 17:04:12 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[5] at createStream at EventsConsumerApp.scala:39 of time 1442786650000 ms
15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1442786630000 ms)
15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/09/20 17:04:20 INFO scheduler.JobScheduler: Added jobs for time 1442786660000 ms


As seen in console events produced using kafka-producer can be consumed in apache spark + kafka application, and can be persited to NoSQL databases or wherever thereafter.

Source code
scalability-patterns/decoupled-invocation

Resources
Spark Streaming + Kafka Integration Guide

Monday, 12 January 2015

git branching model

Git Branching most of the time seemed like a maze to me. Specially while sending a Pull Request for sprint development.
When there are three features in a sprint,
1) One idea could be  - maintain three local branches for each feature and send three parallel PRs separately.
After feature gets merged, delete local and remote feature branches.

2) The other could be - maintain three local branches for each feature, but when finished merge them to one single branch and send a PR one by one or ?.


I think the second one restricts from having number of unwanted branches to be deleted later, which looks OK to me while working in a team. 

Here's how I implement this approach; assuming sprint/develop is an upto-date branch created with HEAD of master,

STEP 1 - create feature branch
git checkout -b module-1/feature-1 sprint/develop


STEP 2 - after feature completion
Merge it to single sprint/develop branch
git checkout sprint/develop
git merge --no-ff module-1/feature-1 # create a new commit object.

## delete the feature branch
git branch -d module-1/feature-1


STEP 3 - rebase sprint/develop branch with master
git rebase master ## if any conflicts occur, resolve them manually, git add conflicted files, and
                            ## git rebase --continue


STEP 4 - push develop changes
git push origin sprint/develop  ## or git push --force origin sprint/develop

STEP 5 - Send PR and get reviewed
Send PR for merge on master. what?? few implementation not aligned with architecture of codebase?? FIX them on same branch and push.