Sunday, 20 September 2015

Kafka Events Streaming - Spark Consumer Example

* Apache Kafka supports a wide range of use cases as a general-purpose messaging system for scenarios where high throughput, reliable delivery, and horizontal scalability are important.
Use cases include:
* Stream Processing
* Website Activity Tracking
* Metrics Collection and Monitoring
* Log Aggs

* Apache Storm and Apache Spark both work very well in combination with Kafka.




Here's an example how Kafka can be used with apache spark for consuming events stream,

DEVELOPMENT
STEP 1 : ADD lib dependency to decoupled-invocation/build.sbt 


name := "decoupled-invocation"                                                                                                                        

version := "1.0"                                                                                    

scalaVersion := "2.10.4"                                                                            

val sparkVersion = "1.2.0"                                                                          

libraryDependencies ++= Seq(                                                                        
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",                              
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion                                      
)   


STEP 2 : Create kafka stream in scala file decoupled-invocationsrc/main/scala/EventsConsumerApp.scala


object EventsConsumerApp {                                                                          

  def persist() = {                                                                                 

  }                                                                                                 

  def main(args : Array[String]): Unit = {                                                          
   val sparkConf = new SparkConf().setMaster("spark://prayagupd:7077").setAppName("EventsConsumerApp")
   val sparkStreamingContext = new StreamingContext(sparkConf, Seconds(10))                         

   val kafkaConf = Map("metadata.broker.list" -> "localhost:9092",                                  
                       "zookeeper.connect" -> "localhost:2181",                                     
                       "group.id" -> "events-topic-consumer-group",                                 
                       "zookeeper.connection.timeout.ms" -> "1000")                                 

    //http://spark.apache.org/docs/latest/streaming-programming-guide.html#discretized-streams-dstreams
   val kafkaDiscretizedStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](sparkStreamingContext,
                                             kafkaConf,                                             
                                             Map("events-topic" -> 1), StorageLevel.MEMORY_ONLY_SER)
   //persist()                                                                                      
   kafkaDiscretizedStream.print()                                                                   
   sparkStreamingContext.start()                                                                    
   sparkStreamingContext.awaitTermination()                                                         
 }                                                                                                  
}   


STEP 3 : build application
sbt assembly ## doing sbt package wont find kafka jar while submitting job to spark


DEPLOYMENT
STEP 4 : start kafka broker with default config
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

#terminal 2
bin/kafka-server-start.sh config/server.properties

STEP 5 : Create kafka topic events-topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic events-topic
Created topic "events-topic".

verify topic is created
bin/kafka-topics.sh --list --zookeeper localhost:2181
events-topic
STEP 6 : Produce stream using kafka to topic events-topic


bin/kafka-console-producer.sh --broker-list localhost:9092 --topic events-topic
{'message' : 'user is logging in'}
{'message' : 'User logged in'}


STEP 7 : submit spark job that will consume events produced by above kafka producer


/usr/local/spark-1.2.0/bin/spark-submit --class EventsConsumerApp --master spark://prayagupd:7077 target/scala-2.10/decoupled-invocation-assembly-1.0.jar

(null,{'message' : 'user is logging in'})
(null,{'message' : 'User logged in'})

15/09/20 17:04:12 INFO scheduler.JobScheduler: Finished job streaming job 1442786650000 ms.0 from job set of time 1442786650000 ms
15/09/20 17:04:12 INFO scheduler.JobScheduler: Total delay: 2.294 s for time 1442786650000 ms (execution: 2.289 s)
15/09/20 17:04:12 INFO rdd.BlockRDD: Removing RDD 5 from persistence list
15/09/20 17:04:12 INFO storage.BlockManager: Removing RDD 5
15/09/20 17:04:12 INFO kafka.KafkaInputDStream: Removing blocks of RDD BlockRDD[5] at createStream at EventsConsumerApp.scala:39 of time 1442786650000 ms
15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1442786630000 ms)
15/09/20 17:04:12 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
15/09/20 17:04:20 INFO scheduler.JobScheduler: Added jobs for time 1442786660000 ms


As seen in console events produced using kafka-producer can be consumed in apache spark + kafka application, and can be persited to NoSQL databases or wherever thereafter.

Source code
scalability-patterns/decoupled-invocation

Resources
Spark Streaming + Kafka Integration Guide

No comments:

Post a Comment